This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3f7f47d362587076431da98ada957eb63df6935d
Author: Ingo Bürk <[email protected]>
AuthorDate: Wed Jul 7 09:20:06 2021 +0200

    [FLINK-23067][table-api-java] Introduce Table#executeInsert(TableDescriptor)
    
    This closes #16290.
---
 .../java/org/apache/flink/table/api/Table.java     | 59 ++++++++++++++++++++++
 .../apache/flink/table/api/internal/TableImpl.java | 13 +++++
 .../runtime/stream/table/TableSinkITCase.scala     | 23 ++++++++-
 3 files changed, 94 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index 6c8fb79..e5a75a9 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1356,6 +1356,65 @@ public interface Table {
     TableResult executeInsert(String tablePath, boolean overwrite);
 
     /**
+     * Declares that the pipeline defined by the given {@link Table} object 
should be written to a
+     * table defined by a {@link TableDescriptor}. It executes the insert 
operation.
+     *
+     * <p>The {@link TableDescriptor descriptor} is registered as an inline 
(i.e. anonymous)
+     * temporary catalog table (see {@link 
TableEnvironment#createTemporaryTable(String,
+     * TableDescriptor)}) using a unique identifier. Note that calling this 
method multiple times,
+     * even with the same descriptor, results in multiple sink tables being 
registered.
+     *
+     * <p>Examples:
+     *
+     * <pre>{@code
+     * Schema schema = Schema.newBuilder()
+     *   .column("f0", DataTypes.STRING())
+     *   .build();
+     *
+     * Table table = tableEnv.from(TableDescriptor.forConnector("datagen")
+     *   .schema(schema)
+     *   .build());
+     *
+     * table.executeInsert(TableDescriptor.forConnector("blackhole")
+     *   .schema(schema)
+     *   .build());
+     * }</pre>
+     *
+     * @param descriptor Descriptor describing the sink table into which data 
should be inserted.
+     */
+    TableResult executeInsert(TableDescriptor descriptor);
+
+    /**
+     * Declares that the pipeline defined by the given {@link Table} object 
should be written to a
+     * table defined by a {@link TableDescriptor}. It executes the insert 
operation.
+     *
+     * <p>The {@link TableDescriptor descriptor} is registered as an inline 
(i.e. anonymous)
+     * temporary catalog table (see {@link 
TableEnvironment#createTemporaryTable(String,
+     * TableDescriptor)}) using a unique identifier. Note that calling this 
method multiple times,
+     * even with the same descriptor, results in multiple sink tables being 
registered.
+     *
+     * <p>Examples:
+     *
+     * <pre>{@code
+     * Schema schema = Schema.newBuilder()
+     *   .column("f0", DataTypes.STRING())
+     *   .build();
+     *
+     * Table table = tableEnv.from(TableDescriptor.forConnector("datagen")
+     *   .schema(schema)
+     *   .build());
+     *
+     * table.executeInsert(TableDescriptor.forConnector("blackhole")
+     *   .schema(schema)
+     *   .build(), true);
+     * }</pre>
+     *
+     * @param descriptor Descriptor describing the sink table into which data 
should be inserted.
+     * @param overwrite Indicates whether existing data should be overwritten.
+     */
+    TableResult executeInsert(TableDescriptor descriptor, boolean overwrite);
+
+    /**
      * Collects the contents of the current table local client.
      *
      * <pre>{@code
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index 437e656..73eb707 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.api.GroupedTable;
 import org.apache.flink.table.api.OverWindow;
 import org.apache.flink.table.api.OverWindowedTable;
 import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableDescriptor;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableResult;
@@ -573,6 +574,18 @@ public class TableImpl implements Table {
     }
 
     @Override
+    public TableResult executeInsert(TableDescriptor descriptor) {
+        return executeInsert(descriptor, false);
+    }
+
+    @Override
+    public TableResult executeInsert(TableDescriptor descriptor, boolean 
overwrite) {
+        final String path = TableDescriptorUtil.getUniqueAnonymousPath();
+        tableEnvironment.createTemporaryTable(path, descriptor);
+        return executeInsert(path, overwrite);
+    }
+
+    @Override
     public TableResult execute() {
         return tableEnvironment.executeInternal(getQueryOperation());
     }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index aa52f73..9c360cc 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -23,6 +23,7 @@ import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.StreamingTestBase
 import org.apache.flink.table.planner.runtime.utils.TestData.{data1, 
nullData4, smallTupleData3, tupleData2, tupleData3, tupleData5}
 import org.apache.flink.table.utils.LegacyRowResource
@@ -35,7 +36,6 @@ import java.io.File
 import java.lang.{Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
 import java.util.concurrent.atomic.AtomicInteger
-
 import scala.collection.JavaConversions._
 import scala.collection.{Seq, mutable}
 import scala.io.Source
@@ -1358,4 +1358,25 @@ class TableSinkITCase extends StreamingTestBase {
          |""".stripMargin)
     tEnv.executeSql(s"INSERT INTO $sinkTableName SELECT * FROM 
$sourceTableName").await()
   }
+
+  @Test
+  def testExecuteInsertToTableDescriptor(): Unit = {
+    val schema = Schema.newBuilder()
+      .column("f0", DataTypes.INT())
+      .build();
+
+    val tableId = TestValuesTableFactory.registerData(Seq(row(42)))
+    tEnv.createTemporaryTable("T", TableDescriptor.forConnector("values")
+      .schema(schema)
+      .option("data-id", tableId)
+      .option("bounded", "true")
+      .build())
+
+    val tableResult = 
tEnv.from("T").executeInsert(TableDescriptor.forConnector("values")
+      .schema(schema)
+      .build())
+
+    tableResult.await()
+    assertEquals(Seq("+I(42)"), 
TestValuesTableFactory.getOnlyRawResults.toList)
+  }
 }

Reply via email to