[
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16648478#comment-16648478
]
ASF GitHub Bot commented on FLINK-10156:
----------------------------------------
asfgit closed pull request #6805: [FLINK-10156][table] Deprecate
Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index 146d1a6c1fd..622fe9ffedf 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -46,6 +46,8 @@ StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
tableEnv.registerTable("table1", ...) // or
tableEnv.registerTableSource("table2", ...); // or
tableEnv.registerExternalCatalog("extCat", ...);
+// register an output Table
+tableEnv.registerTableSink("outputTable", ...);
// create a Table from a Table API query
Table tapiResult = tableEnv.scan("table1").select(...);
@@ -53,7 +55,7 @@ Table tapiResult = tableEnv.scan("table1").select(...);
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
// emit a Table API result Table to a TableSink, same for SQL result
-tapiResult.writeToSink(...);
+tapiResult.insertInto("outputTable");
// execute
env.execute();
@@ -72,7 +74,9 @@ val tableEnv = TableEnvironment.getTableEnvironment(env)
// register a Table
tableEnv.registerTable("table1", ...) // or
tableEnv.registerTableSource("table2", ...) // or
-tableEnv.registerExternalCatalog("extCat", ...)
+tableEnv.registerExternalCatalog("extCat", ...)
+// register an output Table
+tableEnv.registerTableSink("outputTable", ...);
// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
@@ -80,7 +84,7 @@ val tapiResult = tableEnv.scan("table1").select(...)
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
// emit a Table API result Table to a TableSink, same for SQL result
-tapiResult.writeToSink(...)
+tapiResult.insertInto("outputTable")
// execute
env.execute()
@@ -500,10 +504,7 @@ A batch `Table` can only be written to a `BatchTableSink`,
while a streaming `Ta
Please see the documentation about [Table Sources & Sinks]({{ site.baseurl
}}/dev/table/sourceSinks.html) for details about available sinks and
instructions for how to implement a custom `TableSink`.
-There are two ways to emit a table:
-
-1. The `Table.writeToSink(TableSink sink)` method emits the table using the
provided `TableSink` and automatically configures the sink with the schema of
the table to emit.
-2. The `Table.insertInto(String sinkTable)` method looks up a `TableSink` that
was registered with a specific schema under the provided name in the
`TableEnvironment`'s catalog. The schema of the table to emit is validated
against the schema of the registered `TableSink`.
+The `Table.insertInto(String tableName)` method emits the `Table` to a
registered `TableSink`. The method looks up the `TableSink` from the catalog by
the name and validates that the schema of the `Table` is identical to the
schema of the `TableSink`.
The following examples shows how to emit a `Table`:
@@ -513,22 +514,17 @@ The following examples shows how to emit a `Table`:
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-// compute a result Table using Table API operators and/or SQL queries
-Table result = ...
-
// create a TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
-// METHOD 1:
-// Emit the result Table to the TableSink via the writeToSink() method
-result.writeToSink(sink);
-
-// METHOD 2:
-// Register the TableSink with a specific schema
+// register the TableSink with a specific schema
String[] fieldNames = {"a", "b", "c"};
TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink);
-// Emit the result Table to the registered TableSink via the insertInto()
method
+
+// compute a result Table using Table API operators and/or SQL queries
+Table result = ...
+// emit the result Table to the registered TableSink
result.insertInto("CsvSinkTable");
// execute the program
@@ -540,22 +536,18 @@ result.insertInto("CsvSinkTable");
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
-// compute a result Table using Table API operators and/or SQL queries
-val result: Table = ...
-
// create a TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
-// METHOD 1:
-// Emit the result Table to the TableSink via the writeToSink() method
-result.writeToSink(sink)
-
-// METHOD 2:
-// Register the TableSink with a specific schema
+// register the TableSink with a specific schema
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING,
Types.LONG)
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
-// Emit the result Table to the registered TableSink via the insertInto()
method
+
+// compute a result Table using Table API operators and/or SQL queries
+val result: Table = ...
+
+// emit the result Table to the registered TableSink
result.insertInto("CsvSinkTable")
// execute the program
@@ -576,7 +568,7 @@ Table API and SQL queries are translated into
[DataStream]({{ site.baseurl }}/de
A Table API or SQL query is translated when:
-* a `Table` is emitted to a `TableSink`, i.e., when `Table.writeToSink()` or
`Table.insertInto()` is called.
+* a `Table` is emitted to a `TableSink`, i.e., when `Table.insertInto()` is
called.
* a SQL update query is specified, i.e., when `TableEnvironment.sqlUpdate()`
is called.
* a `Table` is converted into a `DataStream` or `DataSet` (see [Integration
with DataStream and DataSet API](#integration-with-dataStream-and-dataSet-api)).
diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 0b7d8948f79..2958e2201dd 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -1047,30 +1047,42 @@ The sink only supports append-only streaming tables. It
cannot be used to emit a
<div data-lang="java" markdown="1">
{% highlight java %}
-Table table = ...
-
-table.writeToSink(
- new CsvTableSink(
+CsvTableSink sink = new CsvTableSink(
path, // output path
"|", // optional: delimit files by '|'
1, // optional: write to a single file
- WriteMode.OVERWRITE)); // optional: override existing files
+ WriteMode.OVERWRITE); // optional: override existing files
+
+tableEnv.registerTableSink(
+ "csvOutputTable",
+ sink,
+ // specify table schema
+ new String[]{"f0", "f1"},
+ new TypeInformation[]{Types.STRING, Types.INT});
+Table table = ...
+table.insertInto("csvOutputTable");
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-val table: Table = ???
-
-table.writeToSink(
- new CsvTableSink(
+val sink: CsvTableSink = new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
- writeMode = WriteMode.OVERWRITE)) // optional: override existing files
+ writeMode = WriteMode.OVERWRITE) // optional: override existing files
+tableEnv.registerTableSink(
+ "csvOutputTable",
+ sink,
+ // specify table schema
+ Array[String]("f0", "f1"),
+ Array[TypeInformation[_]](Types.STRING, Types.INT))
+
+val table: Table = ???
+table.insertInto("csvOutputTable")
{% endhighlight %}
</div>
</div>
@@ -1094,8 +1106,15 @@ JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
.setParameterTypes(INT_TYPE_INFO)
.build();
+tableEnv.registerTableSink(
+ "jdbcOutputTable",
+ sink,
+ // specify table schema
+ new String[]{"id"},
+ new TypeInformation[]{Types.INT});
+
Table table = ...
-table.writeToSink(sink);
+table.insertInto("jdbcOutputTable");
{% endhighlight %}
</div>
@@ -1108,8 +1127,15 @@ val sink: JDBCAppendTableSink =
JDBCAppendTableSink.builder()
.setParameterTypes(INT_TYPE_INFO)
.build()
+tableEnv.registerTableSink(
+ "jdbcOutputTable",
+ sink,
+ // specify table schema
+ Array[String]("id"),
+ Array[TypeInformation[_]](Types.INT))
+
val table: Table = ???
-table.writeToSink(sink)
+table.insertInto("jdbcOutputTable")
{% endhighlight %}
</div>
</div>
@@ -1137,8 +1163,15 @@ CassandraAppendTableSink sink = new
CassandraAppendTableSink(
// the query must match the schema of the table
INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?));
+tableEnv.registerTableSink(
+ "cassandraOutputTable",
+ sink,
+ // specify table schema
+ new String[]{"id", "name", "value"},
+ new TypeInformation[]{Types.INT, Types.STRING, Types.DOUBLE});
+
Table table = ...
-table.writeToSink(sink);
+table.insertInto(cassandraOutputTable);
{% endhighlight %}
</div>
@@ -1151,8 +1184,15 @@ val sink: CassandraAppendTableSink = new
CassandraAppendTableSink(
// the query must match the schema of the table
INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?))
+tableEnv.registerTableSink(
+ "cassandraOutputTable",
+ sink,
+ // specify table schema
+ Array[String]("id", "name", "value"),
+ Array[TypeInformation[_]](Types.INT, Types.STRING, Types.DOUBLE))
+
val table: Table = ???
-table.writeToSink(sink)
+table.insertInto(cassandraOutputTable)
{% endhighlight %}
</div>
</div>
diff --git a/docs/dev/table/streaming.md b/docs/dev/table/streaming.md
index d0fea47cbfd..2fabc356e2d 100644
--- a/docs/dev/table/streaming.md
+++ b/docs/dev/table/streaming.md
@@ -515,8 +515,15 @@ Table result = ...
// create TableSink
TableSink<Row> sink = ...
+// register TableSink
+tableEnv.registerTableSink(
+ "outputTable", // table name
+ new String[]{...}, // field names
+ new TypeInformation[]{...}, // field types
+ sink); // table sink
+
// emit result Table via a TableSink
-result.writeToSink(sink, qConfig);
+result.insertInto("outputTable", qConfig);
// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);
@@ -539,8 +546,15 @@ val result: Table = ???
// create TableSink
val sink: TableSink[Row] = ???
+// register TableSink
+tableEnv.registerTableSink(
+ "outputTable", // table name
+ Array[String](...), // field names
+ Array[TypeInformation[_]](...), // field types
+ sink) // table sink
+
// emit result Table via a TableSink
-result.writeToSink(sink, qConfig)
+result.insertInto("outputTable", qConfig)
// convert result Table into a DataStream[Row]
val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
diff --git
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index a2a77777ebf..84b7813cb8d 100644
---
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -24,6 +24,7 @@
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -450,9 +451,13 @@ public void testCassandraTableSink() throws Exception {
DataStreamSource<Row> source =
env.fromCollection(rowCollection);
tEnv.registerDataStreamInternal("testFlinkTable", source);
+ tEnv.registerTableSink("cassandraTable",
+ new CassandraAppendTableSink(builder,
injectTableName(INSERT_DATA_QUERY))
+ .configure(
+ new String[]{"f0", "f1", "f2"},
+ new TypeInformation[]{Types.STRING,
Types.INT, Types.INT}));
- tEnv.sql("select * from testFlinkTable").writeToSink(
- new CassandraAppendTableSink(builder,
injectTableName(INSERT_DATA_QUERY)));
+ tEnv.sqlQuery("select * from
testFlinkTable").insertInto("cassandraTable");
env.execute();
ResultSet rs =
session.execute(injectTableName(SELECT_DATA_QUERY));
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 6c968347334..00374246fbe 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -891,7 +891,13 @@ class Table(
*
* @param sink The [[TableSink]] to which the [[Table]] is written.
* @tparam T The data type that the [[TableSink]] expects.
+ *
+ * @deprecated Will be removed in a future release. Please register the
TableSink and use
+ * Table.insertInto().
*/
+ @deprecated("This method will be removed. Please register the TableSink and
use " +
+ "Table.insertInto().", "1.7.0")
+ @Deprecated
def writeToSink[T](sink: TableSink[T]): Unit = {
val queryConfig = Option(this.tableEnv) match {
case None => null
@@ -912,7 +918,13 @@ class Table(
* @param sink The [[TableSink]] to which the [[Table]] is written.
* @param conf The configuration for the query that writes to the sink.
* @tparam T The data type that the [[TableSink]] expects.
+ *
+ * @deprecated Will be removed in a future release. Please register the
TableSink and use
+ * Table.insertInto().
*/
+ @deprecated("This method will be removed. Please register the TableSink and
use " +
+ "Table.insertInto().", "1.7.0")
+ @Deprecated
def writeToSink[T](sink: TableSink[T], conf: QueryConfig): Unit = {
// get schema information of table
val rowType = getRelNode.getRowType
@@ -944,7 +956,12 @@ class Table(
* @param tableName Name of the registered [[TableSink]] to which the
[[Table]] is written.
*/
def insertInto(tableName: String): Unit = {
- tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
+ this.logicalPlan match {
+ case _: LogicalTableFunctionCall =>
+ throw new ValidationException("TableFunction can only be used in join
and leftOuterJoin.")
+ case _ =>
+ tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
+ }
}
/**
@@ -960,7 +977,12 @@ class Table(
* @param conf The [[QueryConfig]] to use.
*/
def insertInto(tableName: String, conf: QueryConfig): Unit = {
- tableEnv.insertInto(this, tableName, conf)
+ this.logicalPlan match {
+ case _: LogicalTableFunctionCall =>
+ throw ValidationException("TableFunction can only be used in join and
leftOuterJoin.")
+ case _ =>
+ tableEnv.insertInto(this, tableName, conf)
+ }
}
/**
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
index 8f429e11dec..58a82c524dd 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
@@ -17,10 +17,12 @@
*/
package org.apache.flink.table.api.stream.table.validation
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.expressions.utils._
+import org.apache.flink.table.runtime.stream.table.TestAppendSink
import org.apache.flink.table.utils.{ObjectTableFunction, TableFunc1,
TableFunc2, TableTestBase}
import org.junit.Assert.{assertTrue, fail}
import org.junit.Test
@@ -47,6 +49,9 @@ class CorrelateValidationTest extends TableTestBase {
val func1 = new TableFunc1
util.javaTableEnv.registerFunction("func1", func1)
+ util.javaTableEnv.registerTableSink(
+ "testSink", new TestAppendSink().configure(
+ Array[String]("f"), Array[TypeInformation[_]](Types.INT)))
// table function call select
expectExceptionThrown(
@@ -60,10 +65,10 @@ class CorrelateValidationTest extends TableTestBase {
"TableFunction can only be used in join and leftOuterJoin."
)
- // table function call writeToSink
+ // table function call insertInto
expectExceptionThrown(
- func1('c).writeToSink(null),
- "Cannot translate a query with an unbounded table function call."
+ func1('c).insertInto("testSink"),
+ "TableFunction can only be used in join and leftOuterJoin."
)
// table function call distinct
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala
index 3f9c4018532..40b1eed4942 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala
@@ -35,10 +35,11 @@ class TableSinkValidationTest extends TableTestBase {
val tEnv = TableEnvironment.getTableEnvironment(env)
val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num,
'text)
+ tEnv.registerTableSink("testSink", new TestAppendSink)
t.groupBy('text)
.select('text, 'id.count, 'num.sum)
- .writeToSink(new TestAppendSink)
+ .insertInto("testSink")
// must fail because table is not append-only
env.execute()
@@ -53,11 +54,12 @@ class TableSinkValidationTest extends TableTestBase {
val t = StreamTestData.get3TupleDataStream(env)
.assignAscendingTimestamps(_._1.toLong)
.toTable(tEnv, 'id, 'num, 'text)
+ tEnv.registerTableSink("testSink", new TestUpsertSink(Array("len",
"cTrue"), false))
t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue)
.groupBy('len, 'cTrue)
.select('len, 'id.count, 'num.sum)
- .writeToSink(new TestUpsertSink(Array("len", "cTrue"), false))
+ .insertInto("testSink")
// must fail because table is updating table without full key
env.execute()
@@ -70,10 +72,11 @@ class TableSinkValidationTest extends TableTestBase {
val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e,
'f, 'g, 'h)
+ tEnv.registerTableSink("testSink", new TestAppendSink)
ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h)
.select('c, 'g)
- .writeToSink(new TestAppendSink)
+ .insertInto("testSink")
// must fail because table is not append-only
env.execute()
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
index 90a12ee2f70..b2b26cb6596 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
@@ -34,11 +34,12 @@ class TableSinksValidationTest extends TableTestBase {
val util = streamTestUtil()
val t = util.addTable[(Int, Long, String)]("MyTable", 'id, 'num, 'text)
+ util.tableEnv.registerTableSink("testSink", new TestAppendSink)
t.groupBy('text)
.select('text, 'id.count, 'num.sum)
// must fail because table is not append-only
- .writeToSink(new TestAppendSink)
+ .insertInto("testSink")
}
@Test(expected = classOf[TableException])
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
index d8ba29ae478..b68afceec07 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
@@ -20,9 +20,10 @@ package org.apache.flink.table.runtime.batch.table
import java.io.File
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
import
org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
@@ -48,13 +49,18 @@ class TableSinkITCase(
val tEnv = TableEnvironment.getTableEnvironment(env, config)
env.setParallelism(4)
+ tEnv.registerTableSink(
+ "testSink",
+ new CsvTableSink(path, fieldDelim = "|").configure(
+ Array[String]("c", "b"), Array[TypeInformation[_]](Types.STRING,
Types.LONG)))
+
val input = CollectionDataSets.get3TupleDataSet(env)
.map(x => x).setParallelism(4) // increase DOP to 4
val results = input.toTable(tEnv, 'a, 'b, 'c)
.where('a < 5 || 'a > 17)
.select('c, 'b)
- .writeToSink(new CsvTableSink(path, fieldDelim = "|"))
+ .insertInto("testSink")
env.execute()
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index 85a96475a9a..304dbb317f3 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -185,13 +185,23 @@ class TimeAttributesITCase extends AbstractTestBase {
val tEnv = TableEnvironment.getTableEnvironment(env)
MemoryTableSourceSinkUtil.clear()
+ tEnv.registerTableSink(
+ "testSink",
+ (new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink).configure(
+ Array[String]("rowtime", "floorDay", "ceilDay"),
+ Array[TypeInformation[_]](Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP,
Types.SQL_TIMESTAMP)
+ ))
+
val stream = env
.fromCollection(data)
.assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec,
'string)
.filter('rowtime.cast(Types.LONG) > 4)
- .select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY),
'rowtime.ceil(TimeIntervalUnit.DAY))
- .writeToSink(new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink)
+ .select(
+ 'rowtime,
+ 'rowtime.floor(TimeIntervalUnit.DAY).as('floorDay),
+ 'rowtime.ceil(TimeIntervalUnit.DAY).as('ceilDay))
+ .insertInto("testSink")
env.execute()
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
index 219b7653c57..e4d938d4786 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.runtime.stream.table
import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
@@ -307,11 +308,16 @@ class AggregateITCase extends StreamingWithStateTestBase {
data.+=((4, 3L, "C"))
data.+=((5, 3L, "C"))
+ tEnv.registerTableSink(
+ "testSink",
+ new TestUpsertSink(Array("c"), false).configure(
+ Array[String]("c", "bMax"), Array[TypeInformation[_]](Types.STRING,
Types.LONG)))
+
val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
.groupBy('c)
.select('c, 'b.max)
- t.writeToSink(new TestUpsertSink(Array("c"), false))
+ t.insertInto("testSink")
env.execute()
val expected = List("(true,A,1)", "(true,B,2)", "(true,C,3)")
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
index e1454487f35..197a0391288 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
@@ -20,12 +20,13 @@ package org.apache.flink.table.runtime.stream.table
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment,
TableException, Types}
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment, Types}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData,
StreamingWithStateTestBase}
import org.junit.Assert._
import org.junit.Test
import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.expressions.{Literal, Null}
import org.apache.flink.table.functions.aggfunctions.CountAggFunction
import
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct,
WeightedAvg}
@@ -77,6 +78,12 @@ class JoinITCase extends StreamingWithStateTestBase {
val leftTable = env.fromCollection(data1).toTable(tEnv, 'a, 'b)
val rightTable = env.fromCollection(data2).toTable(tEnv, 'bb, 'c)
+ tEnv.registerTableSink(
+ "upsertSink",
+ new TestUpsertSink(Array("a,b"), false).configure(
+ Array[String]("a", "b", "c"),
+ Array[TypeInformation[_]](Types.INT, Types.LONG, Types.LONG)))
+
val leftTableWithPk = leftTable
.groupBy('a)
.select('a, 'b.count as 'b)
@@ -88,7 +95,7 @@ class JoinITCase extends StreamingWithStateTestBase {
leftTableWithPk
.join(rightTableWithPk, 'b === 'bb)
.select('a, 'b, 'c)
- .writeToSink(new TestUpsertSink(Array("a,b"), false), queryConfig)
+ .insertInto("upsertSink", queryConfig)
env.execute()
val results = RowCollector.getAndClearValues
@@ -135,6 +142,12 @@ class JoinITCase extends StreamingWithStateTestBase {
val leftTable = env.fromCollection(data1).toTable(tEnv, 'a, 'b)
val rightTable = env.fromCollection(data2).toTable(tEnv, 'bb, 'c, 'd)
+ tEnv.registerTableSink(
+ "retractSink",
+ new TestRetractSink().configure(
+ Array[String]("a", "b", "c", "d"),
+ Array[TypeInformation[_]](Types.INT, Types.INT, Types.INT, Types.INT)))
+
val leftTableWithPk = leftTable
.groupBy('a)
.select('a, 'b.max as 'b)
@@ -142,7 +155,7 @@ class JoinITCase extends StreamingWithStateTestBase {
leftTableWithPk
.join(rightTable, 'a === 'bb && ('a < 4 || 'a > 4))
.select('a, 'b, 'c, 'd)
- .writeToSink(new TestRetractSink, queryConfig)
+ .insertInto("retractSink", queryConfig)
env.execute()
val results = RowCollector.getAndClearValues
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 70e59f3d24d..8e13599661d 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -54,7 +54,7 @@ class TableSinkITCase extends AbstractTestBase {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
- MemoryTableSourceSinkUtil.clear
+ MemoryTableSourceSinkUtil.clear()
val input = StreamTestData.get3TupleDataStream(env)
.assignAscendingTimestamps(r => r._2)
@@ -92,6 +92,12 @@ class TableSinkITCase extends AbstractTestBase {
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setParallelism(4)
+ tEnv.registerTableSink(
+ "csvSink",
+ new CsvTableSink(path).configure(
+ Array[String]("c", "b"),
+ Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP)))
+
val input = StreamTestData.get3TupleDataStream(env)
.assignAscendingTimestamps(_._2)
.map(x => x).setParallelism(4) // increase DOP to 4
@@ -99,7 +105,7 @@ class TableSinkITCase extends AbstractTestBase {
input.toTable(tEnv, 'a, 'b.rowtime, 'c)
.where('a < 5 || 'a > 17)
.select('c, 'b)
- .writeToSink(new CsvTableSink(path))
+ .insertInto("csvSink")
env.execute()
@@ -127,10 +133,16 @@ class TableSinkITCase extends AbstractTestBase {
.assignAscendingTimestamps(_._1.toLong)
.toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+ tEnv.registerTableSink(
+ "appendSink",
+ new TestAppendSink().configure(
+ Array[String]("t", "icnt", "nsum"),
+ Array[TypeInformation[_]](Types.SQL_TIMESTAMP, Types.LONG,
Types.LONG)))
+
t.window(Tumble over 5.millis on 'rowtime as 'w)
.groupBy('w)
- .select('w.end, 'id.count, 'num.sum)
- .writeToSink(new TestAppendSink)
+ .select('w.end as 't, 'id.count as 'icnt, 'num.sum as 'nsum)
+ .insertInto("appendSink")
env.execute()
@@ -155,9 +167,15 @@ class TableSinkITCase extends AbstractTestBase {
val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a,
'b, 'c)
val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e,
'f, 'g, 'h)
+ tEnv.registerTableSink(
+ "appendSink",
+ new TestAppendSink().configure(
+ Array[String]("c", "g"),
+ Array[TypeInformation[_]](Types.STRING, Types.STRING)))
+
ds1.join(ds2).where('b === 'e)
.select('c, 'g)
- .writeToSink(new TestAppendSink)
+ .insertInto("appendSink")
env.execute()
@@ -177,10 +195,16 @@ class TableSinkITCase extends AbstractTestBase {
.assignAscendingTimestamps(_._1.toLong)
.toTable(tEnv, 'id, 'num, 'text)
+ tEnv.registerTableSink(
+ "retractSink",
+ new TestRetractSink().configure(
+ Array[String]("len", "icnt", "nsum"),
+ Array[TypeInformation[_]](Types.INT, Types.LONG, Types.LONG)))
+
t.select('id, 'num, 'text.charLength() as 'len)
.groupBy('len)
- .select('len, 'id.count, 'num.sum)
- .writeToSink(new TestRetractSink)
+ .select('len, 'id.count as 'icnt, 'num.sum as 'nsum)
+ .insertInto("retractSink")
env.execute()
val results = RowCollector.getAndClearValues
@@ -209,10 +233,16 @@ class TableSinkITCase extends AbstractTestBase {
.assignAscendingTimestamps(_._1.toLong)
.toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+ tEnv.registerTableSink(
+ "retractSink",
+ new TestRetractSink().configure(
+ Array[String]("t", "icnt", "nsum"),
+ Array[TypeInformation[_]](Types.SQL_TIMESTAMP, Types.LONG,
Types.LONG)))
+
t.window(Tumble over 5.millis on 'rowtime as 'w)
.groupBy('w)
- .select('w.end, 'id.count, 'num.sum)
- .writeToSink(new TestRetractSink)
+ .select('w.end as 't, 'id.count as 'icnt, 'num.sum as 'nsum)
+ .insertInto("retractSink")
env.execute()
val results = RowCollector.getAndClearValues
@@ -244,12 +274,18 @@ class TableSinkITCase extends AbstractTestBase {
.assignAscendingTimestamps(_._1.toLong)
.toTable(tEnv, 'id, 'num, 'text)
+ tEnv.registerTableSink(
+ "upsertSink",
+ new TestUpsertSink(Array("cnt", "cTrue"), false).configure(
+ Array[String]("cnt", "lencnt", "cTrue"),
+ Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.BOOLEAN)))
+
t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue)
.groupBy('len, 'cTrue)
.select('len, 'id.count as 'cnt, 'cTrue)
.groupBy('cnt, 'cTrue)
- .select('cnt, 'len.count, 'cTrue)
- .writeToSink(new TestUpsertSink(Array("cnt", "cTrue"), false))
+ .select('cnt, 'len.count as 'lencnt, 'cTrue)
+ .insertInto("upsertSink")
env.execute()
val results = RowCollector.getAndClearValues
@@ -279,10 +315,16 @@ class TableSinkITCase extends AbstractTestBase {
.assignAscendingTimestamps(_._1.toLong)
.toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+ tEnv.registerTableSink(
+ "upsertSink",
+ new TestUpsertSink(Array("wend", "num"), true).configure(
+ Array[String]("num", "wend", "icnt"),
+ Array[TypeInformation[_]](Types.LONG, Types.SQL_TIMESTAMP,
Types.LONG)))
+
t.window(Tumble over 5.millis on 'rowtime as 'w)
.groupBy('w, 'num)
- .select('num, 'w.end as 'wend, 'id.count)
- .writeToSink(new TestUpsertSink(Array("wend", "num"), true))
+ .select('num, 'w.end as 'wend, 'id.count as 'icnt)
+ .insertInto("upsertSink")
env.execute()
val results = RowCollector.getAndClearValues
@@ -317,10 +359,17 @@ class TableSinkITCase extends AbstractTestBase {
.assignAscendingTimestamps(_._1.toLong)
.toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+ tEnv.registerTableSink(
+ "upsertSink",
+ new TestUpsertSink(Array("wstart", "wend", "num"), true).configure(
+ Array[String]("wstart", "wend", "num", "icnt"),
+ Array[TypeInformation[_]]
+ (Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG, Types.LONG)))
+
t.window(Tumble over 5.millis on 'rowtime as 'w)
.groupBy('w, 'num)
- .select('w.start as 'wstart, 'w.end as 'wend, 'num, 'id.count)
- .writeToSink(new TestUpsertSink(Array("wstart", "wend", "num"), true))
+ .select('w.start as 'wstart, 'w.end as 'wend, 'num, 'id.count as 'icnt)
+ .insertInto("upsertSink")
env.execute()
val results = RowCollector.getAndClearValues
@@ -355,10 +404,16 @@ class TableSinkITCase extends AbstractTestBase {
.assignAscendingTimestamps(_._1.toLong)
.toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+ tEnv.registerTableSink(
+ "upsertSink",
+ new TestUpsertSink(null, true).configure(
+ Array[String]("wend", "cnt"),
+ Array[TypeInformation[_]](Types.SQL_TIMESTAMP, Types.LONG)))
+
t.window(Tumble over 5.millis on 'rowtime as 'w)
.groupBy('w, 'num)
.select('w.end as 'wend, 'id.count as 'cnt)
- .writeToSink(new TestUpsertSink(null, true))
+ .insertInto("upsertSink")
env.execute()
val results = RowCollector.getAndClearValues
@@ -393,10 +448,16 @@ class TableSinkITCase extends AbstractTestBase {
.assignAscendingTimestamps(_._1.toLong)
.toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+ tEnv.registerTableSink(
+ "upsertSink",
+ new TestUpsertSink(null, true).configure(
+ Array[String]("num", "cnt"),
+ Array[TypeInformation[_]](Types.LONG, Types.LONG)))
+
t.window(Tumble over 5.millis on 'rowtime as 'w)
.groupBy('w, 'num)
.select('num, 'id.count as 'cnt)
- .writeToSink(new TestUpsertSink(null, true))
+ .insertInto("upsertSink")
env.execute()
val results = RowCollector.getAndClearValues
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Drop the Table.writeToSink() method
> -----------------------------------
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: Fabian Hueske
> Assignee: Fabian Hueske
> Priority: Major
> Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a
> table into a table that was previously registered with a {{TableSink}} in the
> catalog. It is the inverse method to the {{scan()}} method and the equivalent
> to an {{INSERT INTO ... SELECT}} SQL query.
>
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks
> (and TableSources) should only be registered with the {{TableEnvironment}}
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is
> more aligned with SQL.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)