This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3f7f47d362587076431da98ada957eb63df6935d Author: Ingo Bürk <[email protected]> AuthorDate: Wed Jul 7 09:20:06 2021 +0200 [FLINK-23067][table-api-java] Introduce Table#executeInsert(TableDescriptor) This closes #16290. --- .../java/org/apache/flink/table/api/Table.java | 59 ++++++++++++++++++++++ .../apache/flink/table/api/internal/TableImpl.java | 13 +++++ .../runtime/stream/table/TableSinkITCase.scala | 23 ++++++++- 3 files changed, 94 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java index 6c8fb79..e5a75a9 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java @@ -1356,6 +1356,65 @@ public interface Table { TableResult executeInsert(String tablePath, boolean overwrite); /** + * Declares that the pipeline defined by the given {@link Table} object should be written to a + * table defined by a {@link TableDescriptor}. It executes the insert operation. + * + * <p>The {@link TableDescriptor descriptor} is registered as an inline (i.e. anonymous) + * temporary catalog table (see {@link TableEnvironment#createTemporaryTable(String, + * TableDescriptor)}) using a unique identifier. Note that calling this method multiple times, + * even with the same descriptor, results in multiple sink tables being registered. + * + * <p>Examples: + * + * <pre>{@code + * Schema schema = Schema.newBuilder() + * .column("f0", DataTypes.STRING()) + * .build(); + * + * Table table = tableEnv.from(TableDescriptor.forConnector("datagen") + * .schema(schema) + * .build()); + * + * table.executeInsert(TableDescriptor.forConnector("blackhole") + * .schema(schema) + * .build()); + * }</pre> + * + * @param descriptor Descriptor describing the sink table into which data should be inserted. + */ + TableResult executeInsert(TableDescriptor descriptor); + + /** + * Declares that the pipeline defined by the given {@link Table} object should be written to a + * table defined by a {@link TableDescriptor}. It executes the insert operation. + * + * <p>The {@link TableDescriptor descriptor} is registered as an inline (i.e. anonymous) + * temporary catalog table (see {@link TableEnvironment#createTemporaryTable(String, + * TableDescriptor)}) using a unique identifier. Note that calling this method multiple times, + * even with the same descriptor, results in multiple sink tables being registered. + * + * <p>Examples: + * + * <pre>{@code + * Schema schema = Schema.newBuilder() + * .column("f0", DataTypes.STRING()) + * .build(); + * + * Table table = tableEnv.from(TableDescriptor.forConnector("datagen") + * .schema(schema) + * .build()); + * + * table.executeInsert(TableDescriptor.forConnector("blackhole") + * .schema(schema) + * .build(), true); + * }</pre> + * + * @param descriptor Descriptor describing the sink table into which data should be inserted. + * @param overwrite Indicates whether existing data should be overwritten. + */ + TableResult executeInsert(TableDescriptor descriptor, boolean overwrite); + + /** * Collects the contents of the current table local client. * * <pre>{@code diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java index 437e656..73eb707 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java @@ -28,6 +28,7 @@ import org.apache.flink.table.api.GroupedTable; import org.apache.flink.table.api.OverWindow; import org.apache.flink.table.api.OverWindowedTable; import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableDescriptor; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableResult; @@ -573,6 +574,18 @@ public class TableImpl implements Table { } @Override + public TableResult executeInsert(TableDescriptor descriptor) { + return executeInsert(descriptor, false); + } + + @Override + public TableResult executeInsert(TableDescriptor descriptor, boolean overwrite) { + final String path = TableDescriptorUtil.getUniqueAnonymousPath(); + tableEnvironment.createTemporaryTable(path, descriptor); + return executeInsert(path, overwrite); + } + + @Override public TableResult execute() { return tableEnvironment.executeInternal(getQueryOperation()); } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala index aa52f73..9c360cc 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala @@ -23,6 +23,7 @@ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow +import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.StreamingTestBase import org.apache.flink.table.planner.runtime.utils.TestData.{data1, nullData4, smallTupleData3, tupleData2, tupleData3, tupleData5} import org.apache.flink.table.utils.LegacyRowResource @@ -35,7 +36,6 @@ import java.io.File import java.lang.{Long => JLong} import java.math.{BigDecimal => JBigDecimal} import java.util.concurrent.atomic.AtomicInteger - import scala.collection.JavaConversions._ import scala.collection.{Seq, mutable} import scala.io.Source @@ -1358,4 +1358,25 @@ class TableSinkITCase extends StreamingTestBase { |""".stripMargin) tEnv.executeSql(s"INSERT INTO $sinkTableName SELECT * FROM $sourceTableName").await() } + + @Test + def testExecuteInsertToTableDescriptor(): Unit = { + val schema = Schema.newBuilder() + .column("f0", DataTypes.INT()) + .build(); + + val tableId = TestValuesTableFactory.registerData(Seq(row(42))) + tEnv.createTemporaryTable("T", TableDescriptor.forConnector("values") + .schema(schema) + .option("data-id", tableId) + .option("bounded", "true") + .build()) + + val tableResult = tEnv.from("T").executeInsert(TableDescriptor.forConnector("values") + .schema(schema) + .build()) + + tableResult.await() + assertEquals(Seq("+I(42)"), TestValuesTableFactory.getOnlyRawResults.toList) + } }
