[FLINK-6442] [table] Add registration for TableSinks and INSERT INTO support for SQL and Table API.
This closes #3829. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cb37cb9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cb37cb9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cb37cb9 Branch: refs/heads/master Commit: 2cb37cb937c6f225ad7afe829a28a6eda043ffc1 Parents: df5efe9 Author: lincoln-lil <[email protected]> Authored: Thu May 4 17:52:34 2017 +0800 Committer: Fabian Hueske <[email protected]> Committed: Wed Sep 20 10:12:13 2017 +0200 ---------------------------------------------------------------------- docs/dev/table/common.md | 153 ++++++++++++-- docs/dev/table/sql.md | 83 ++++++-- docs/dev/table/tableApi.md | 105 ++++++++-- docs/dev/table/udfs.md | 20 +- .../addons/hbase/HBaseConnectorITCase.java | 8 +- .../flink/table/examples/java/WordCountSQL.java | 2 +- .../table/examples/scala/StreamSQLExample.scala | 2 +- .../table/examples/scala/WordCountSQL.scala | 2 +- .../flink/table/api/BatchTableEnvironment.scala | 50 ++++- .../table/api/StreamTableEnvironment.scala | 60 +++++- .../flink/table/api/TableEnvironment.scala | 202 +++++++++++++++++-- .../apache/flink/table/api/queryConfig.scala | 1 + .../org/apache/flink/table/api/table.scala | 41 +++- .../flink/table/calcite/FlinkPlannerImpl.scala | 6 +- .../table/plan/schema/TableSinkTable.scala | 45 +++++ .../runtime/batch/JavaTableSourceITCase.java | 2 +- .../runtime/batch/sql/GroupingSetsITCase.java | 6 +- .../table/runtime/batch/sql/JavaSqlITCase.java | 12 +- .../table/runtime/stream/sql/JavaSqlITCase.java | 8 +- .../api/batch/BatchTableEnvironmentTest.scala | 4 +- .../sql/validation/CalcValidationTest.scala | 2 +- .../validation/InsertIntoValidationTest.scala | 77 +++++++ .../sql/validation/JoinValidationTest.scala | 22 +- .../validation/OverWindowValidationTest.scala | 4 +- .../sql/validation/SortValidationTest.scala | 2 +- .../validation/InsertIntoValidationTest.scala | 61 ++++++ .../api/stream/StreamTableEnvironmentTest.scala | 4 +- .../flink/table/api/stream/sql/JoinTest.scala | 4 +- .../validation/InsertIntoValidationTest.scala | 87 ++++++++ .../validation/OverWindowValidationTest.scala | 6 +- .../validation/CorrelateValidationTest.scala | 6 +- .../validation/InsertIntoValidationTest.scala | 68 +++++++ .../validation/TableSinksValidationTest.scala | 27 ++- .../expressions/utils/ExpressionTestBase.scala | 1 - .../plan/ExpressionReductionRulesTest.scala | 4 +- .../flink/table/plan/RetractionRulesTest.scala | 2 +- .../plan/TimeIndicatorConversionTest.scala | 8 +- .../runtime/batch/sql/AggregateITCase.scala | 34 ++-- .../table/runtime/batch/sql/CalcITCase.scala | 26 +-- .../table/runtime/batch/sql/JoinITCase.scala | 44 ++-- .../runtime/batch/sql/SetOperatorsITCase.scala | 24 +-- .../table/runtime/batch/sql/SortITCase.scala | 8 +- .../batch/sql/TableEnvironmentITCase.scala | 35 +++- .../runtime/batch/sql/TableSourceITCase.scala | 4 +- .../table/runtime/batch/table/CalcITCase.scala | 6 +- .../batch/table/TableEnvironmentITCase.scala | 24 +++ .../runtime/stream/TimeAttributesITCase.scala | 4 +- .../table/runtime/stream/sql/JoinITCase.scala | 4 +- .../runtime/stream/sql/OverWindowITCase.scala | 30 +-- .../table/runtime/stream/sql/SortITCase.scala | 2 +- .../table/runtime/stream/sql/SqlITCase.scala | 59 ++++-- .../runtime/stream/sql/TableSourceITCase.scala | 2 +- .../runtime/stream/table/TableSinkITCase.scala | 32 ++- .../flink/table/utils/MemoryTableSinkUtil.scala | 84 ++++++++ .../table/utils/MockTableEnvironment.scala | 6 + .../flink/table/utils/TableTestBase.scala | 8 +- 56 files changed, 1371 insertions(+), 262 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/common.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md index fed2b6d..acd5711 100644 --- a/docs/dev/table/common.md +++ b/docs/dev/table/common.md @@ -50,7 +50,7 @@ tableEnv.registerExternalCatalog("extCat", ...); // create a Table from a Table API query Table tapiResult = tableEnv.scan("table1").select(...); // create a Table from a SQL query -Table sqlResult = tableEnv.sql("SELECT ... FROM table2 ... "); +Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... "); // emit a Table API result Table to a TableSink, same for SQL result tapiResult.writeToSink(...); @@ -77,7 +77,7 @@ tableEnv.registerExternalCatalog("extCat", ...) // create a Table from a Table API query val tapiResult = tableEnv.scan("table1").select(...) // Create a Table from a SQL query -val sqlResult = tableEnv.sql("SELECT ... FROM table2 ...") +val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...") // emit a Table API result Table to a TableSink, same for SQL result tapiResult.writeToSink(...) @@ -149,18 +149,18 @@ val bTableEnv = TableEnvironment.getTableEnvironment(bEnv) {% top %} -Register a Table in the Catalog +Register Tables in the Catalog ------------------------------- -A `TableEnvironment` has an internal catalog of tables, organized by table name. Table API or SQL queries can access tables which are registered in the catalog, by referencing them by name. +A `TableEnvironment` maintains a catalog of tables which are registered by name. There are two types of tables, *input tables* and *output tables*. Input tables can be referenced in Table API and SQL queries and provide input data. Output tables can be used to emit the result of a Table API or SQL query to an external system. -A `TableEnvironment` allows you to register a table from various sources: +An input table can be registered from various sources: * an existing `Table` object, usually the result of a Table API or SQL query. * a `TableSource`, which accesses external data, such as a file, database, or messaging system. -* a `DataStream` or `DataSet` from a DataStream or DataSet program. +* a `DataStream` or `DataSet` from a DataStream or DataSet program. Registering a `DataStream` or `DataSet` is discussed in the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section. -Registering a `DataStream` or `DataSet` as a table is discussed in the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section. +An output table can be registerd using a `TableSink`. ### Register a Table @@ -200,7 +200,7 @@ tableEnv.registerTable("projectedTable", projTable) ### Register a TableSource -A `TableSource` provides access to external data which is stored in a storage systems such as a database (MySQL, HBase, ...), a file with specific encoding (CSV, Apache \[Parquet, Avro, ORC\], ...), or a messaging system (Apache Kafka, RabbitMQ, ...). +A `TableSource` provides access to external data which is stored in a storage system such as a database (MySQL, HBase, ...), a file with a specific encoding (CSV, Apache \[Parquet, Avro, ORC\], ...), or a messaging system (Apache Kafka, RabbitMQ, ...). Flink aims to provide TableSources for common data formats and storage systems. Please have a look at the [Table Sources and Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) page for a list of supported TableSources and instructions for how to build a custom `TableSource`. @@ -236,6 +236,52 @@ tableEnv.registerTableSource("CsvTable", csvSource) {% top %} +### Register a TableSink + +A registered `TableSink` can be used to [emit the result of a Table API or SQL query](common.html#emit-a-table) to an external storage system, such as a database, key-value store, message queue, or file system (in different encodings, e.g., CSV, Apache \[Parquet, Avro, ORC\], ...). + +Flink aims to provide TableSinks for common data formats and storage systems. Please see the documentation about [Table Sources and Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) page for details about available sinks and instructions for how to implement a custom `TableSink`. + +A `TableSink` is registered in a `TableEnvironment` as follows: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently +StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + +// create a TableSink +TableSink csvSink = new CsvTableSink("/path/to/file", ...); + +// define the field names and types +String[] fieldNames = {"a", "b", "c"}; +TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG}; + +// register the TableSink as table "CsvSinkTable" +tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +// get a TableEnvironment +val tableEnv = TableEnvironment.getTableEnvironment(env) + +// create a TableSink +val csvSink: TableSink = new CsvTableSink("/path/to/file", ...) + +// define the field names and types +val fieldNames: Arary[String] = Array("a", "b", "c") +val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG) + +// register the TableSink as table "CsvSinkTable" +tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink) +{% endhighlight %} +</div> +</div> + +{% top %} + Register an External Catalog ---------------------------- @@ -342,7 +388,7 @@ Flink's SQL integration is based on [Apache Calcite](https://calcite.apache.org) The [SQL]({{ site.baseurl }}/dev/table/sql.html) document describes Flink's SQL support for streaming and batch tables. -The following example shows how to specify a query and return the result as a Table. +The following example shows how to specify a query and return the result as a `Table`. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -353,7 +399,7 @@ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // register Orders table // compute revenue for all customers from France -Table revenue = tableEnv.sql( +Table revenue = tableEnv.sqlQuery( "SELECT cID, cName, SUM(revenue) AS revSum " + "FROM Orders " + "WHERE cCountry = 'FRANCE' " + @@ -373,7 +419,7 @@ val tableEnv = TableEnvironment.getTableEnvironment(env) // register Orders table // compute revenue for all customers from France -Table revenue = tableEnv.sql(""" +Table revenue = tableEnv.sqlQuery(""" |SELECT cID, cName, SUM(revenue) AS revSum |FROM Orders |WHERE cCountry = 'FRANCE' @@ -387,6 +433,53 @@ Table revenue = tableEnv.sql(""" </div> </div> +The following example shows how to specify an update query that inserts its result into a registered table. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently +StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + +// register "Orders" table +// register "RevenueFrance" output table + +// compute revenue for all customers from France and emit to "RevenueFrance" +tableEnv.sqlUpdate( + "INSERT INTO RevenueFrance " + + "SELECT cID, cName, SUM(revenue) AS revSum " + + "FROM Orders " + + "WHERE cCountry = 'FRANCE' " + + "GROUP BY cID, cName" + ); + +// execute query +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +// get a TableEnvironment +val tableEnv = TableEnvironment.getTableEnvironment(env) + +// register "Orders" table +// register "RevenueFrance" output table + +// compute revenue for all customers from France and emit to "RevenueFrance" +tableEnv.sqlUpdate(""" + |INSERT INTO RevenueFrance + |SELECT cID, cName, SUM(revenue) AS revSum + |FROM Orders + |WHERE cCountry = 'FRANCE' + |GROUP BY cID, cName + """.stripMargin) + +// execute query +{% endhighlight %} + +</div> +</div> + {% top %} ### Mixing Table API and SQL @@ -401,12 +494,19 @@ Table API and SQL queries can be easily mixed because both return `Table` object Emit a Table ------------ -In order to emit a `Table`, it can be written to a `TableSink`. A `TableSink` is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ). +A `Table` is emitted by writing it to a `TableSink`. A `TableSink` is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ). -A batch `Table` can only be written to a `BatchTableSink`, while a streaming table requires either an `AppendStreamTableSink`, a `RetractStreamTableSink`, or an `UpsertStreamTableSink`. +A batch `Table` can only be written to a `BatchTableSink`, while a streaming `Table` requires either an `AppendStreamTableSink`, a `RetractStreamTableSink`, or an `UpsertStreamTableSink`. Please see the documentation about [Table Sources & Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) for details about available sinks and instructions for how to implement a custom `TableSink`. +There are two ways to emit a table: + +1. The `Table.writeToSink(TableSink sink)` method emits the table using the provided `TableSink` and automatically configures the sink with the schema of the table to emit. +2. The `Table.insertInto(String sinkTable)` method looks up a `TableSink` that was registered with a specific schema under the provided name in the `TableEnvironment`'s catalog. The schema of the table to emit is validated against the schema of the registered `TableSink`. + +The following examples shows how to emit a `Table`: + <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} @@ -419,9 +519,18 @@ Table result = ... // create a TableSink TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|"); -// write the result Table to the TableSink +// METHOD 1: +// Emit the result Table to the TableSink via the writeToSink() method result.writeToSink(sink); +// METHOD 2: +// Register the TableSink with a specific schema +String[] fieldNames = {"a", "b", "c"}; +TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG}; +tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink); +// Emit the result Table to the registered TableSink via the insertInto() method +result.insertInto("CsvSinkTable"); + // execute the program {% endhighlight %} </div> @@ -437,9 +546,18 @@ val result: Table = ... // create a TableSink val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|") -// write the result Table to the TableSink +// METHOD 1: +// Emit the result Table to the TableSink via the writeToSink() method result.writeToSink(sink) +// METHOD 2: +// Register the TableSink with a specific schema +val fieldNames: Array[String] = Array("a", "b", "c") +val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG) +tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink) +// Emit the result Table to the registered TableSink via the insertInto() method +result.insertInto("CsvSinkTable") + // execute the program {% endhighlight %} </div> @@ -458,8 +576,9 @@ Table API and SQL queries are translated into [DataStream]({{ site.baseurl }}/de A Table API or SQL query is translated when: -* the `Table` is emitted to a `TableSink`, i.e., when `Table.writeToSink()` is called. -* the `Table` is converted into a `DataStream` or `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-dataStream-and-dataSet-api)). +* a `Table` is emitted to a `TableSink`, i.e., when `Table.writeToSink()` or `Table.insertInto()` is called. +* a SQL update query is specified, i.e., when `TableEnvironment.sqlUpdate()` is called. +* a `Table` is converted into a `DataStream` or `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-dataStream-and-dataSet-api)). Once translated, a Table API or SQL query is handled like a regular DataStream or DataSet program and is executed when `StreamExecutionEnvironment.execute()` or `ExecutionEnvironment.execute()` is called. http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/sql.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index fa4e3f3..b9205ab 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -49,15 +49,25 @@ DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...); // SQL query with an inlined (unregistered) table Table table = tableEnv.toTable(ds, "user, product, amount"); -Table result = tableEnv.sql( +Table result = tableEnv.sqlQuery( "SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'"); // SQL query with a registered table // register the DataStream as table "Orders" tableEnv.registerDataStream("Orders", ds, "user, product, amount"); // run a SQL query on the Table and retrieve the result as a new Table -Table result2 = tableEnv.sql( +Table result2 = tableEnv.sqlQuery( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'"); + +// SQL update with a registered table +// create and register a TableSink +TableSink csvSink = new CsvTableSink("/path/to/file", ...); +String[] fieldNames = {"product", "amount"}; +TypeInformation[] fieldTypes = {Types.STRING, Types.INT}; +tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink); +// run a SQL update query on the Table and emit the result to the TableSink +tableEnv.sqlUpdate( + "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'"); {% endhighlight %} </div> @@ -71,15 +81,25 @@ val ds: DataStream[(Long, String, Integer)] = env.addSource(...) // SQL query with an inlined (unregistered) table val table = ds.toTable(tableEnv, 'user, 'product, 'amount) -val result = tableEnv.sql( +val result = tableEnv.sqlQuery( s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'") // SQL query with a registered table // register the DataStream under the name "Orders" tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) // run a SQL query on the Table and retrieve the result as a new Table -val result2 = tableEnv.sql( +val result2 = tableEnv.sqlQuery( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") + +// SQL update with a registered table +// create and register a TableSink +TableSink csvSink = new CsvTableSink("/path/to/file", ...) +val fieldNames: Arary[String] = Array("product", "amount") +val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT) +tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink) +// run a SQL update query on the Table and emit the result to the TableSink +tableEnv.sqlUpdate( + "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") {% endhighlight %} </div> </div> @@ -89,7 +109,7 @@ val result2 = tableEnv.sql( Supported Syntax ---------------- -Flink parses SQL using [Apache Calcite](https://calcite.apache.org/docs/reference.html), which supports standard ANSI SQL. DML and DDL statements are not supported by Flink. +Flink parses SQL using [Apache Calcite](https://calcite.apache.org/docs/reference.html), which supports standard ANSI SQL. DDL statements are not supported by Flink. The following BNF-grammar describes the superset of supported SQL features in batch and streaming queries. The [Operations](#operations) section shows examples for the supported features and indicates which features are only supported for batch or streaming queries. @@ -156,6 +176,10 @@ groupItem: | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')' +insert: + INSERT INTO tableReference + query + ``` Flink SQL uses a lexical policy for identifier (table, attribute, function names) similar to Java: @@ -566,6 +590,39 @@ LIMIT 3 {% top %} +### Insert + +<div markdown="1"> +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Operation</th> + <th class="text-center">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td> + <strong>Insert Into</strong><br> + <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span> + </td> + <td> + <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p> + +{% highlight sql %} +INSERT INTO OutputTable +SELECT users, tag +FROM Orders +{% endhighlight %} + </td> + </tr> + + </tbody> +</table> +</div> + +{% top %} + ### Group Windows Group windows are defined in the `GROUP BY` clause of a SQL query. Just like queries with regular `GROUP BY` clauses, queries with a `GROUP BY` clause that includes a group window function compute a single result row per group. The following group windows functions are supported for SQL on batch and streaming tables. @@ -649,22 +706,22 @@ DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...); tableEnv.registerDataStream("Orders", ds, "user, product, amount, proctime.proctime, rowtime.rowtime"); // compute SUM(amount) per day (in event-time) -Table result1 = tableEnv.sql( +Table result1 = tableEnv.sqlQuery( "SELECT user, " + " TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart, " + " SUM(amount) FROM Orders " + "GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user"); // compute SUM(amount) per day (in processing-time) -Table result2 = tableEnv.sql( +Table result2 = tableEnv.sqlQuery( "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user"); // compute every hour the SUM(amount) of the last 24 hours in event-time -Table result3 = tableEnv.sql( +Table result3 = tableEnv.sqlQuery( "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product"); // compute SUM(amount) per session with 12 hour inactivity gap (in event-time) -Table result4 = tableEnv.sql( +Table result4 = tableEnv.sqlQuery( "SELECT user, " + " SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart, " + " SESSION_END(rowtime, INTERVAL '12' HOUR) AS snd, " + @@ -686,7 +743,7 @@ val ds: DataStream[(Long, String, Int)] = env.addSource(...) tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount, 'proctime.proctime, 'rowtime.rowtime) // compute SUM(amount) per day (in event-time) -val result1 = tableEnv.sql( +val result1 = tableEnv.sqlQuery( """ |SELECT | user, @@ -697,15 +754,15 @@ val result1 = tableEnv.sql( """.stripMargin) // compute SUM(amount) per day (in processing-time) -val result2 = tableEnv.sql( +val result2 = tableEnv.sqlQuery( "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user") // compute every hour the SUM(amount) of the last 24 hours in event-time -val result3 = tableEnv.sql( +val result3 = tableEnv.sqlQuery( "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product") // compute SUM(amount) per session with 12 hour inactivity gap (in event-time) -val result4 = tableEnv.sql( +val result4 = tableEnv.sqlQuery( """ |SELECT | user, http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/tableApi.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index fd57111..0a2acab 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -961,7 +961,52 @@ val result = left.select('a, 'b, 'c).where('a.in(right)); <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Operators</th> + <th class="text-center">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td> + <strong>Order By</strong><br> + <span class="label label-primary">Batch</span> + </td> + <td> + <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p> +{% highlight java %} +Table in = tableEnv.fromDataSet(ds, "a, b, c"); +Table result = in.orderBy("a.asc"); +{% endhighlight %} + </td> + </tr> + + <tr> + <td> + <strong>Limit</strong><br> + <span class="label label-primary">Batch</span> + </td> + <td> + <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p> +{% highlight java %} +Table in = tableEnv.fromDataSet(ds, "a, b, c"); +Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record +{% endhighlight %} +or +{% highlight java %} +Table in = tableEnv.fromDataSet(ds, "a, b, c"); +Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record +{% endhighlight %} + </td> + </tr> + + </tbody> +</table> +</div> +<div data-lang="scala" markdown="1"> <table class="table table-bordered"> <thead> <tr> @@ -970,7 +1015,7 @@ val result = left.select('a, 'b, 'c).where('a.in(right)); </tr> </thead> <tbody> - <tr> + <tr> <td> <strong>Order By</strong><br> <span class="label label-primary">Batch</span> @@ -1005,9 +1050,13 @@ val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning with </tbody> </table> - </div> -<div data-lang="scala" markdown="1"> +</div> + +### Insert + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> <table class="table table-bordered"> <thead> @@ -1017,35 +1066,49 @@ val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning with </tr> </thead> <tbody> - <tr> + <tr> <td> - <strong>Order By</strong><br> - <span class="label label-primary">Batch</span> + <strong>Insert Into</strong><br> + <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span> </td> <td> - <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p> + <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p> + + <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p> + {% highlight java %} -Table in = tableEnv.fromDataSet(ds, "a, b, c"); -Table result = in.orderBy("a.asc"); +Table orders = tableEnv.scan("Orders"); +orders.insertInto("OutOrders"); {% endhighlight %} </td> </tr> + </tbody> +</table> +</div> + +<div data-lang="scala" markdown="1"> +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Operators</th> + <th class="text-center">Description</th> + </tr> + </thead> + <tbody> <tr> <td> - <strong>Limit</strong><br> - <span class="label label-primary">Batch</span> + <strong>Insert Into</strong><br> + <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span> </td> <td> - <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p> -{% highlight java %} -Table in = tableEnv.fromDataSet(ds, "a, b, c"); -Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record -{% endhighlight %} -or -{% highlight java %} -Table in = tableEnv.fromDataSet(ds, "a, b, c"); -Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record + <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p> + + <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p> + +{% highlight scala %} +val orders: Table = tableEnv.scan("Orders") +orders.insertInto("OutOrders") {% endhighlight %} </td> </tr> @@ -1055,6 +1118,8 @@ Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning w </div> </div> +{% top %} + ### Group Windows Group window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, windows are a convenient shortcut to group records by time intervals. http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/udfs.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/udfs.md b/docs/dev/table/udfs.md index 6c9bc1a..eef7db6 100644 --- a/docs/dev/table/udfs.md +++ b/docs/dev/table/udfs.md @@ -72,7 +72,7 @@ tableEnv.registerFunction("hashCode", new HashCode(10)); myTable.select("string, string.hashCode(), hashCode(string)"); // use the function in SQL API -tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable"); +tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable"); {% endhighlight %} </div> @@ -93,7 +93,7 @@ myTable.select('string, hashCode('string)) // register and use the function in SQL tableEnv.registerFunction("hashCode", new HashCode(10)) -tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable"); +tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable"); {% endhighlight %} </div> </div> @@ -176,9 +176,9 @@ myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length"); // Use the table function in SQL with LATERAL and TABLE keywords. // CROSS JOIN a table function (equivalent to "join" in Table API). -tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)"); +tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)"); // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API). -tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE"); +tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE"); {% endhighlight %} </div> @@ -206,9 +206,9 @@ tableEnv.registerFunction("split", new Split("#")) // Use the table function in SQL with LATERAL and TABLE keywords. // CROSS JOIN a table function (equivalent to "join" in Table API) -tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)"); +tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)"); // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API) -tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE"); +tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE"); {% endhighlight %} **IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues. </div> @@ -572,7 +572,7 @@ StreamTableEnvironment tEnv = ... tEnv.registerFunction("wAvg", new WeightedAvg()); // use function -tEnv.sql("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user"); +tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user"); {% endhighlight %} </div> @@ -649,7 +649,7 @@ val tEnv: StreamTableEnvironment = ??? tEnv.registerFunction("wAvg", new WeightedAvg()) // use function -tEnv.sql("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user") +tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user") {% endhighlight %} </div> @@ -720,7 +720,7 @@ tableEnv.registerFunction("hashCode", new HashCode()); myTable.select("string, string.hashCode(), hashCode(string)"); // use the function in SQL -tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable"); +tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable"); {% endhighlight %} </div> @@ -748,7 +748,7 @@ myTable.select('string, hashCode('string)) // register and use the function in SQL tableEnv.registerFunction("hashCode", hashCode) -tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable"); +tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable"); {% endhighlight %} </div> http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java index 5d71ca5..3da4230 100644 --- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java +++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java @@ -155,7 +155,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter { hbaseTable.addColumn(FAMILY3, F3COL3, String.class); tableEnv.registerTableSource("hTable", hbaseTable); - Table result = tableEnv.sql( + Table result = tableEnv.sqlQuery( "SELECT " + " h.family1.col1, " + " h.family2.col1, " + @@ -196,7 +196,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter { hbaseTable.addColumn(FAMILY3, F3COL3, String.class); tableEnv.registerTableSource("hTable", hbaseTable); - Table result = tableEnv.sql( + Table result = tableEnv.sqlQuery( "SELECT " + " h.family1.col1, " + " h.family3.col1, " + @@ -236,7 +236,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter { hbaseTable.addColumn(FAMILY3, F3COL3, String.class); tableEnv.registerTableSource("hTable", hbaseTable); - Table result = tableEnv.sql( + Table result = tableEnv.sqlQuery( "SELECT * FROM hTable AS h" ); DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); @@ -270,7 +270,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter { tableEnv.registerFunction("toUTF8", new ToUTF8()); tableEnv.registerFunction("toLong", new ToLong()); - Table result = tableEnv.sql( + Table result = tableEnv.sqlQuery( "SELECT " + " toUTF8(h.family2.col1), " + " toLong(h.family2.col2) " + http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java index 65efc17..22f0553 100644 --- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java +++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java @@ -53,7 +53,7 @@ public class WordCountSQL { tEnv.registerDataSet("WordCount", input, "word, frequency"); // run a SQL query on the Table and retrieve the result as a new Table - Table table = tEnv.sql( + Table table = tEnv.sqlQuery( "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word"); DataSet<WC> result = tEnv.toDataSet(table, WC.class); http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala index 665913e..3297aec 100644 --- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala +++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala @@ -58,7 +58,7 @@ object StreamSQLExample { tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount) // union the two tables - val result = tEnv.sql( + val result = tEnv.sqlQuery( "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " + "SELECT * FROM OrderB WHERE amount < 2") http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala index a8b8268..55bbdb5 100644 --- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala +++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala @@ -48,7 +48,7 @@ object WordCountSQL { tEnv.registerDataSet("WordCount", input, 'word, 'frequency) // run a SQL query on the Table and retrieve the result as a new Table - val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word") + val table = tEnv.sqlQuery("SELECT word, SUM(frequency) FROM WordCount GROUP BY word") table.toDataSet[WC].print() } http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index a9d60dd..bca5826 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -36,7 +36,7 @@ import org.apache.flink.table.expressions.{Expression, TimeAttribute} import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.dataset.DataSetRel import org.apache.flink.table.plan.rules.FlinkRuleSets -import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema, TableSourceTable} +import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema, TableSinkTable, TableSourceTable} import org.apache.flink.table.runtime.MapRunner import org.apache.flink.table.sinks.{BatchTableSink, TableSink} import org.apache.flink.table.sources.{BatchTableSource, TableSource} @@ -106,6 +106,54 @@ abstract class BatchTableEnvironment( } /** + * Registers an external [[TableSink]] with given field names and types in this + * [[TableEnvironment]]'s catalog. + * Registered sink tables can be referenced in SQL DML statements. + * + * Example: + * + * {{{ + * // create a table sink and its field names and types + * val fieldNames: Array[String] = Array("a", "b", "c") + * val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG) + * val tableSink: BatchTableSink = new YourTableSinkImpl(...) + * + * // register the table sink in the catalog + * tableEnv.registerTableSink("output_table", fieldNames, fieldsTypes, tableSink) + * + * // use the registered sink + * tableEnv.sqlUpdate("INSERT INTO output_table SELECT a, b, c FROM sourceTable") + * }}} + * + * @param name The name under which the [[TableSink]] is registered. + * @param fieldNames The field names to register with the [[TableSink]]. + * @param fieldTypes The field types to register with the [[TableSink]]. + * @param tableSink The [[TableSink]] to register. + */ + def registerTableSink( + name: String, + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]], + tableSink: TableSink[_]): Unit = { + + checkValidTableName(name) + if (fieldNames == null) throw TableException("fieldNames must not be null.") + if (fieldTypes == null) throw TableException("fieldTypes must not be null.") + if (fieldNames.length == 0) throw new TableException("fieldNames must not be empty.") + if (fieldNames.length != fieldTypes.length) { + throw new TableException("Same number of field names and types required.") + } + + tableSink match { + case batchTableSink: BatchTableSink[_] => + val configuredSink = batchTableSink.configure(fieldNames, fieldTypes) + registerTableInternal(name, new TableSinkTable(configuredSink)) + case _ => + throw new TableException("Only BatchTableSink can be registered in BatchTableEnvironment.") + } + } + + /** * Writes a [[Table]] to a [[TableSink]]. * * Internally, the [[Table]] is translated into a [[DataSet]] and handed over to the http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 8d8cebb..c7cc61b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -42,12 +42,12 @@ import org.apache.flink.table.expressions._ import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait} import org.apache.flink.table.plan.rules.FlinkRuleSets -import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable} import org.apache.flink.table.plan.util.UpdatingPlanChecker import org.apache.flink.table.runtime.conversion._ +import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable, TableSinkTable} import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.runtime.{CRowMapRunner, OutputRowtimeProcessFunction} -import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink} +import org.apache.flink.table.sinks._ import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource} import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils} @@ -79,7 +79,7 @@ abstract class StreamTableEnvironment( // the naming pattern for internally registered tables. private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r - def queryConfig: StreamQueryConfig = new StreamQueryConfig + override def queryConfig: StreamQueryConfig = new StreamQueryConfig /** * Checks if the chosen table name is valid. @@ -130,6 +130,60 @@ abstract class StreamTableEnvironment( } /** + * Registers an external [[TableSink]] with given field names and types in this + * [[TableEnvironment]]'s catalog. + * Registered sink tables can be referenced in SQL DML statements. + * + * Example: + * + * {{{ + * // create a table sink and its field names and types + * val fieldNames: Array[String] = Array("a", "b", "c") + * val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG) + * val tableSink: StreamTableSink = new YourTableSinkImpl(...) + * + * // register the table sink in the catalog + * tableEnv.registerTableSink("output_table", fieldNames, fieldsTypes, tableSink) + * + * // use the registered sink + * tableEnv.sqlUpdate("INSERT INTO output_table SELECT a, b, c FROM sourceTable") + * }}} + * + * @param name The name under which the [[TableSink]] is registered. + * @param fieldNames The field names to register with the [[TableSink]]. + * @param fieldTypes The field types to register with the [[TableSink]]. + * @param tableSink The [[TableSink]] to register. + */ + def registerTableSink( + name: String, + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]], + tableSink: TableSink[_]): Unit = { + + checkValidTableName(name) + if (fieldNames == null) throw TableException("fieldNames must not be null.") + if (fieldTypes == null) throw TableException("fieldTypes must not be null.") + if (fieldNames.length == 0) throw new TableException("fieldNames must not be empty.") + if (fieldNames.length != fieldTypes.length) { + throw new TableException("Same number of field names and types required.") + } + + tableSink match { + case streamTableSink@( + _: AppendStreamTableSink[_] | + _: UpsertStreamTableSink[_] | + _: RetractStreamTableSink[_]) => + + val configuredSink = streamTableSink.configure(fieldNames, fieldTypes) + registerTableInternal(name, new TableSinkTable(configuredSink)) + case _ => + throw new TableException( + "Only AppendStreamTableSink, UpsertStreamTableSink, and RetractStreamTableSink can be " + + "registered in StreamTableEnvironment.") + } + } + + /** * Writes a [[Table]] to a [[TableSink]]. * * Internally, the [[Table]] is translated into a [[DataStream]] and handed over to the http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 2e9e18f..0424cf8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -31,7 +31,7 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.schema.SchemaPlus import org.apache.calcite.schema.impl.AbstractTable -import org.apache.calcite.sql.SqlOperatorTable +import org.apache.calcite.sql._ import org.apache.calcite.sql.parser.SqlParser import org.apache.calcite.sql.util.ChainedSqlOperatorTable import org.apache.calcite.tools._ @@ -49,13 +49,13 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema} import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction} -import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference, _} -import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions, _} +import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction} import org.apache.flink.table.plan.cost.DataSetCostFactory import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode} import org.apache.flink.table.plan.rules.FlinkRuleSets -import org.apache.flink.table.plan.schema.{RelTable, RowSchema} +import org.apache.flink.table.plan.schema.{RelTable, RowSchema, TableSinkTable} import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.{DefinedFieldNames, TableSource} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo @@ -110,6 +110,13 @@ abstract class TableEnvironment(val config: TableConfig) { /** Returns the table config to define the runtime behavior of the Table API. */ def getConfig: TableConfig = config + /** Returns the [[QueryConfig]] depends on the concrete type of this TableEnvironment. */ + private[flink] def queryConfig: QueryConfig = this match { + case _: BatchTableEnvironment => new BatchQueryConfig + case _: StreamTableEnvironment => new StreamQueryConfig + case _ => null + } + /** * Returns the operator table for this environment including a custom Calcite configuration. */ @@ -276,7 +283,8 @@ abstract class TableEnvironment(val config: TableConfig) { s"${t.msg}\n" + s"Please check the documentation for the set of currently supported SQL features.") case a: AssertionError => - throw a.getCause + // keep original exception stack for caller + throw a } output } @@ -414,6 +422,22 @@ abstract class TableEnvironment(val config: TableConfig) { def registerTableSource(name: String, tableSource: TableSource[_]): Unit /** + * Registers an external [[TableSink]] with given field names and types in this + * [[TableEnvironment]]'s catalog. + * Registered sink tables can be referenced in SQL DML statements. + * + * @param name The name under which the [[TableSink]] is registered. + * @param fieldNames The field names to register with the [[TableSink]]. + * @param fieldTypes The field types to register with the [[TableSink]]. + * @param tableSink The [[TableSink]] to register. + */ + def registerTableSink( + name: String, + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]], + tableSink: TableSink[_]): Unit + + /** * Replaces a registered Table with another Table under the same name. * We use this method to replace a [[org.apache.flink.table.plan.schema.DataStreamTable]] * with a [[org.apache.calcite.schema.TranslatableTable]]. @@ -489,9 +513,10 @@ abstract class TableEnvironment(val config: TableConfig) { /** * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]]. * - * All tables referenced by the query must be registered in the TableEnvironment. But - * [[Table.toString]] will automatically register an unique table name and return the - * table name. So it allows to call SQL directly on tables like this: + * All tables referenced by the query must be registered in the TableEnvironment. + * A [[Table]] is automatically registered when its [[toString]] method is called, for example + * when it is embedded into a String. + * Hence, SQL queries can directly reference a [[Table]] as follows: * * {{{ * val table: Table = ... @@ -502,16 +527,110 @@ abstract class TableEnvironment(val config: TableConfig) { * @param query The SQL query to evaluate. * @return The result of the query as Table. */ + @deprecated("Please use sqlQuery() instead.") def sql(query: String): Table = { + sqlQuery(query) + } + + /** + * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]]. + * + * All tables referenced by the query must be registered in the TableEnvironment. + * A [[Table]] is automatically registered when its [[toString]] method is called, for example + * when it is embedded into a String. + * Hence, SQL queries can directly reference a [[Table]] as follows: + * + * {{{ + * val table: Table = ... + * // the table is not registered to the table environment + * tEnv.sqlQuery(s"SELECT * FROM $table") + * }}} + * + * @param query The SQL query to evaluate. + * @return The result of the query as Table + */ + def sqlQuery(query: String): Table = { val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query val parsed = planner.parse(query) - // validate the sql query - val validated = planner.validate(parsed) - // transform to a relational tree - val relational = planner.rel(validated) + if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) { + // validate the sql query + val validated = planner.validate(parsed) + // transform to a relational tree + val relational = planner.rel(validated) + new Table(this, LogicalRelNode(relational.rel)) + } else { + throw new TableException( + "Unsupported SQL query! sqlQuery() only accepts SQL queries of type " + + "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.") + } + } + + /** + * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement; + * NOTE: Currently only SQL INSERT statements are supported. + * + * All tables referenced by the query must be registered in the TableEnvironment. + * A [[Table]] is automatically registered when its [[toString]] method is called, for example + * when it is embedded into a String. + * Hence, SQL queries can directly reference a [[Table]] as follows: + * + * {{{ + * // register the table sink into which the result is inserted. + * tEnv.registerTableSink("sinkTable", fieldNames, fieldsTypes, tableSink) + * val sourceTable: Table = ... + * // sourceTable is not registered to the table environment + * tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM $sourceTable") + * }}} + * + * @param stmt The SQL statement to evaluate. + */ + def sqlUpdate(stmt: String): Unit = { + sqlUpdate(stmt, this.queryConfig) + } - new Table(this, LogicalRelNode(relational.rel)) + /** + * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement; + * NOTE: Currently only SQL INSERT statements are supported. + * + * All tables referenced by the query must be registered in the TableEnvironment. + * A [[Table]] is automatically registered when its [[toString]] method is called, for example + * when it is embedded into a String. + * Hence, SQL queries can directly reference a [[Table]] as follows: + * + * {{{ + * // register the table sink into which the result is inserted. + * tEnv.registerTableSink("sinkTable", fieldNames, fieldsTypes, tableSink) + * val sourceTable: Table = ... + * // sourceTable is not registered to the table environment + * tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM $sourceTable") + * }}} + * + * @param stmt The SQL statement to evaluate. + * @param config The [[QueryConfig]] to use. + */ + def sqlUpdate(stmt: String, config: QueryConfig): Unit = { + val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) + // parse the sql query + val parsed = planner.parse(stmt) + parsed match { + case insert: SqlInsert => + // validate the SQL query + val query = insert.getSource + planner.validate(query) + + // get query result as Table + val queryResult = new Table(this, LogicalRelNode(planner.rel(query).rel)) + + // get name of sink table + val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) + + // insert query result into sink table + insertInto(queryResult, targetTableName, config) + case _ => + throw new TableException( + "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.") + } } /** @@ -519,11 +638,62 @@ abstract class TableEnvironment(val config: TableConfig) { * * @param table The [[Table]] to write. * @param sink The [[TableSink]] to write the [[Table]] to. + * @param conf The [[QueryConfig]] to use. * @tparam T The data type that the [[TableSink]] expects. */ private[flink] def writeToSink[T](table: Table, sink: TableSink[T], conf: QueryConfig): Unit /** + * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name. + * + * @param table The table to write to the TableSink. + * @param sinkTableName The name of the registered TableSink. + * @param conf The query configuration to use. + */ + private[flink] def insertInto(table: Table, sinkTableName: String, conf: QueryConfig): Unit = { + + // check that sink table exists + if (null == sinkTableName) throw TableException("Name of TableSink must not be null.") + if (sinkTableName.isEmpty) throw TableException("Name of TableSink must not be empty.") + if (!isRegistered(sinkTableName)) { + throw TableException(s"No table was registered under the name $sinkTableName.") + } + + getTable(sinkTableName) match { + case s: TableSinkTable[_] => + val tableSink = s.tableSink + // validate schema of source table and table sink + val srcFieldTypes = table.getSchema.getTypes + val sinkFieldTypes = tableSink.getFieldTypes + + if (srcFieldTypes.length != sinkFieldTypes.length || + srcFieldTypes.zip(sinkFieldTypes).exists{case (srcF, snkF) => srcF != snkF}) { + + val srcFieldNames = table.getSchema.getColumnNames + val sinkFieldNames = tableSink.getFieldNames + + // format table and table sink schema strings + val srcSchema = srcFieldNames.zip(srcFieldTypes) + .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"} + .mkString("[", ", ", "]") + val sinkSchema = sinkFieldNames.zip(sinkFieldTypes) + .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"} + .mkString("[", ", ", "]") + + throw ValidationException( + s"Field types of query result and registered TableSink $sinkTableName do not match.\n" + + s"Query result schema: $srcSchema\n" + + s"TableSink schema: $sinkSchema") + } + // emit the table to the configured table sink + writeToSink(table, tableSink, conf) + case _ => + throw TableException(s"The table registered as $sinkTableName is not a TableSink. " + + s"You can only emit query results to a registered TableSink.") + } + } + + /** * Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog. * * @param name The name under which the table will be registered. @@ -554,10 +724,14 @@ abstract class TableEnvironment(val config: TableConfig) { * @param name The table name to check. * @return true, if a table is registered under the name, false otherwise. */ - protected def isRegistered(name: String): Boolean = { + protected[flink] def isRegistered(name: String): Boolean = { rootSchema.getTableNames.contains(name) } + private def getTable(name: String): org.apache.calcite.schema.Table = { + rootSchema.getTable(name) + } + protected def getRowType(name: String): RelDataType = { rootSchema.getTable(name).getRowType(typeFactory) } http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala index c8fbab7..4aa5543 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.api import _root_.java.io.Serializable + import org.apache.flink.api.common.time.Time class QueryConfig private[table] extends Serializable {} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala index 2298575..30ed98e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala @@ -25,6 +25,7 @@ import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionPar import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils import org.apache.flink.table.plan.ProjectionTranslator._ import org.apache.flink.table.plan.logical.{Minus, _} +import org.apache.flink.table.plan.schema.TableSinkTable import org.apache.flink.table.sinks.TableSink import _root_.scala.annotation.varargs @@ -762,13 +763,10 @@ class Table( * @tparam T The data type that the [[TableSink]] expects. */ def writeToSink[T](sink: TableSink[T]): Unit = { - - def queryConfig = this.tableEnv match { - case s: StreamTableEnvironment => s.queryConfig - case b: BatchTableEnvironment => new BatchQueryConfig - case _ => null + val queryConfig = Option(this.tableEnv) match { + case None => null + case _ => this.tableEnv.queryConfig } - writeToSink(sink, queryConfig) } @@ -800,6 +798,37 @@ class Table( } /** + * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name. + * + * A batch [[Table]] can only be written to a + * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a + * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a + * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an + * [[org.apache.flink.table.sinks.UpsertStreamTableSink]]. + * + * @param tableName Name of the registered [[TableSink]] to which the [[Table]] is written. + */ + def insertInto(tableName: String): Unit = { + tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig) + } + + /** + * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name. + * + * A batch [[Table]] can only be written to a + * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a + * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a + * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an + * [[org.apache.flink.table.sinks.UpsertStreamTableSink]]. + * + * @param tableName Name of the [[TableSink]] to which the [[Table]] is written. + * @param conf The [[QueryConfig]] to use. + */ + def insertInto(tableName: String, conf: QueryConfig): Unit = { + tableEnv.insertInto(this, tableName, conf) + } + + /** * Groups the records of a table by assigning them to windows defined by a time or row interval. * * For streaming tables of infinite size, grouping into windows is required to define finite http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala index beb2436..4d9acaa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala @@ -57,7 +57,6 @@ class FlinkPlannerImpl( val defaultSchema: SchemaPlus = config.getDefaultSchema var validator: FlinkCalciteSqlValidator = _ - var validatedSqlNode: SqlNode = _ var root: RelRoot = _ private def ready() { @@ -85,16 +84,15 @@ class FlinkPlannerImpl( validator = new FlinkCalciteSqlValidator(operatorTable, createCatalogReader, typeFactory) validator.setIdentifierExpansion(true) try { - validatedSqlNode = validator.validate(sqlNode) + validator.validate(sqlNode) } catch { case e: RuntimeException => throw new ValidationException(s"SQL validation failed. ${e.getMessage}", e) } - validatedSqlNode } - def rel(sql: SqlNode): RelRoot = { + def rel(validatedSqlNode: SqlNode): RelRoot = { try { assert(validatedSqlNode != null) val rexBuilder: RexBuilder = createRexBuilder http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala new file mode 100644 index 0000000..f5e80d5 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.Statistic +import org.apache.calcite.schema.impl.AbstractTable +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.sinks.TableSink + +/** Table which defines an external table via a [[TableSink]] */ +class TableSinkTable[T]( + val tableSink: TableSink[T], + val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) + extends AbstractTable { + + override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { + val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] + flinkTypeFactory.buildLogicalRowType(tableSink.getFieldNames, tableSink.getFieldTypes) + } + + /** + * Returns statistics of current table + * + * @return statistics of current table + */ + override def getStatistic: Statistic = statistic +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java index eb97afe..672b6fd 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java @@ -80,7 +80,7 @@ public class JavaTableSourceITCase extends TableProgramsCollectionTestBase { tableEnv.registerTableSource("persons", csvTable); Table result = tableEnv - .sql("SELECT `last`, FLOOR(id), score * 2 FROM persons WHERE score < 20"); + .sqlQuery("SELECT `last`, FLOOR(id), score * 2 FROM persons WHERE score < 20"); DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); List<Row> results = resultSet.collect(); http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java index 3c8a1cc..455e8ce 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java @@ -186,7 +186,7 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase { * @param expected Expected result. */ private void checkSql(String query, String expected) throws Exception { - Table resultTable = tableEnv.sql(query); + Table resultTable = tableEnv.sqlQuery(query); DataSet<Row> resultDataSet = tableEnv.toDataSet(resultTable, Row.class); List<Row> results = resultDataSet.collect(); TestBaseUtils.compareResultAsText(results, expected); @@ -204,12 +204,12 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase { }; // Execute first query and store results - Table resultTable1 = tableEnv.sql(query1); + Table resultTable1 = tableEnv.sqlQuery(query1); DataSet<Row> resultDataSet1 = tableEnv.toDataSet(resultTable1, Row.class); List<String> results1 = resultDataSet1.map(mapFunction).collect(); // Execute second query and store results - Table resultTable2 = tableEnv.sql(query2); + Table resultTable2 = tableEnv.sqlQuery(query2); DataSet<Row> resultDataSet2 = tableEnv.toDataSet(resultTable2, Row.class); List<String> results2 = resultDataSet2.map(mapFunction).collect(); http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java index c5a394a..f9693fd 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java @@ -61,7 +61,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase { String sqlQuery = "VALUES (1, 'Test', TRUE, DATE '1944-02-24', 12.4444444444444445)," + "(2, 'Hello', TRUE, DATE '1944-02-24', 12.666666665)," + "(3, 'World', FALSE, DATE '1944-12-24', 12.54444445)"; - Table result = tableEnv.sql(sqlQuery); + Table result = tableEnv.sqlQuery(sqlQuery); DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); @@ -83,7 +83,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase { tableEnv.registerTable("T", in); String sqlQuery = "SELECT a, c FROM T"; - Table result = tableEnv.sql(sqlQuery); + Table result = tableEnv.sqlQuery(sqlQuery); DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); List<Row> results = resultSet.collect(); @@ -106,7 +106,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase { tableEnv.registerDataSet("DataSetTable", ds, "x, y, z"); String sqlQuery = "SELECT x FROM DataSetTable WHERE z LIKE '%Hello%'"; - Table result = tableEnv.sql(sqlQuery); + Table result = tableEnv.sqlQuery(sqlQuery); DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); List<Row> results = resultSet.collect(); @@ -123,7 +123,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase { tableEnv.registerDataSet("AggTable", ds, "x, y, z"); String sqlQuery = "SELECT sum(x), min(x), max(x), count(y), avg(x) FROM AggTable"; - Table result = tableEnv.sql(sqlQuery); + Table result = tableEnv.sqlQuery(sqlQuery); DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); List<Row> results = resultSet.collect(); @@ -143,7 +143,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase { tableEnv.registerDataSet("t2", ds2, "d, e, f, g, h"); String sqlQuery = "SELECT c, g FROM t1, t2 WHERE b = e"; - Table result = tableEnv.sql(sqlQuery); + Table result = tableEnv.sqlQuery(sqlQuery); DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); List<Row> results = resultSet.collect(); @@ -168,7 +168,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase { tableEnv.registerDataSet("t1", ds1, "a, b"); String sqlQuery = "SELECT b['foo'] FROM t1"; - Table result = tableEnv.sql(sqlQuery); + Table result = tableEnv.sqlQuery(sqlQuery); DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); List<Row> results = resultSet.collect(); http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java index c6368d4..f3d0309 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java @@ -68,7 +68,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase { tableEnv.registerTable("MyTableRow", in); String sqlQuery = "SELECT a,c FROM MyTableRow"; - Table result = tableEnv.sql(sqlQuery); + Table result = tableEnv.sqlQuery(sqlQuery); DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class); resultSet.addSink(new StreamITCase.StringSink<Row>()); @@ -93,7 +93,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase { tableEnv.registerTable("MyTable", in); String sqlQuery = "SELECT * FROM MyTable"; - Table result = tableEnv.sql(sqlQuery); + Table result = tableEnv.sqlQuery(sqlQuery); DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class); resultSet.addSink(new StreamITCase.StringSink<Row>()); @@ -117,7 +117,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase { tableEnv.registerDataStream("MyTable", ds, "a, b, c, d, e"); String sqlQuery = "SELECT a, b, e FROM MyTable WHERE c < 4"; - Table result = tableEnv.sql(sqlQuery); + Table result = tableEnv.sqlQuery(sqlQuery); DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class); resultSet.addSink(new StreamITCase.StringSink<Row>()); @@ -148,7 +148,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase { String sqlQuery = "SELECT * FROM T1 " + "UNION ALL " + "(SELECT a, b, c FROM T2 WHERE a < 3)"; - Table result = tableEnv.sql(sqlQuery); + Table result = tableEnv.sqlQuery(sqlQuery); DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class); resultSet.addSink(new StreamITCase.StringSink<Row>()); http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala index c9a7049..dde5569 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala @@ -31,7 +31,7 @@ class BatchTableEnvironmentTest extends TableTestBase { val util = batchTestUtil() val table = util.addTable[(Long, Int, String)]("tableName", 'a, 'b, 'c) - val sqlTable = util.tableEnv.sql(s"SELECT a, b, c FROM $table WHERE b > 12") + val sqlTable = util.tableEnv.sqlQuery(s"SELECT a, b, c FROM $table WHERE b > 12") val expected = unaryNode( "DataSetCalc", @@ -43,7 +43,7 @@ class BatchTableEnvironmentTest extends TableTestBase { val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f) - val sqlTable2 = util.tableEnv.sql(s"SELECT d, e, f FROM $table, $table2 WHERE c = d") + val sqlTable2 = util.tableEnv.sqlQuery(s"SELECT d, e, f FROM $table, $table2 WHERE c = d") val join = unaryNode( "DataSetJoin", http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala index 9aada9a..69b12b2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala @@ -34,6 +34,6 @@ class CalcValidationTest extends TableTestBase { val sqlQuery = "SELECT a, foo FROM MyTable" - util.tableEnv.sql(sqlQuery) + util.tableEnv.sqlQuery(sqlQuery) } } http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala new file mode 100644 index 0000000..ef9e6a3 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.batch.sql.validation + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{Types, ValidationException} +import org.apache.flink.table.utils.{MemoryTableSinkUtil, TableTestBase} +import org.junit._ + +class InsertIntoValidationTest extends TableTestBase { + + @Test(expected = classOf[ValidationException]) + def testInconsistentLengthInsert(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c) + + val fieldNames = Array("d", "e") + val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG) + val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink + util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink) + + val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" + + // must fail because table sink schema has too few fields + util.tableEnv.sqlUpdate(sql) + } + + @Test(expected = classOf[ValidationException]) + def testUnmatchedTypesInsert(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c) + + val fieldNames = Array("d", "e", "f") + val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG) + val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink + util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink) + + val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" + + // must fail because types of table sink do not match query result + util.tableEnv.sqlUpdate(sql) + } + + @Test(expected = classOf[ValidationException]) + def testUnsupportedPartialInsert(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c) + + val fieldNames = Array("d", "e", "f") + val fieldTypes = util.tableEnv.scan("sourceTable").getSchema.getTypes + val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink + util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink) + + val sql = "INSERT INTO targetTable (d, f) SELECT a, c FROM sourceTable" + + // must fail because partial insert is not supported yet. + util.tableEnv.sqlUpdate(sql, util.tableEnv.queryConfig) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala index 90bcfec..d9e0e10 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala @@ -35,7 +35,7 @@ class JoinValidationTest extends TableTestBase { val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE foo = e" - util.tableEnv.sql(sqlQuery) + util.tableEnv.sqlQuery(sqlQuery) } @Test(expected = classOf[TableException]) @@ -46,7 +46,7 @@ class JoinValidationTest extends TableTestBase { val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = g" - util.tableEnv.sql(sqlQuery).toDataSet[Row] + util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row] } @Test(expected = classOf[ValidationException]) @@ -57,7 +57,7 @@ class JoinValidationTest extends TableTestBase { val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d" - util.tableEnv.sql(sqlQuery).toDataSet[Row] + util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row] } @Test(expected = classOf[TableException]) @@ -68,7 +68,7 @@ class JoinValidationTest extends TableTestBase { val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE d = f" - util.tableEnv.sql(sqlQuery).toDataSet[Row] + util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row] } @Test(expected = classOf[TableException]) @@ -79,7 +79,7 @@ class JoinValidationTest extends TableTestBase { val sqlQuery = "SELECT a, a1 FROM Table3 CROSS JOIN Table4" - util.tableEnv.sql(sqlQuery).toDataSet[Row] + util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row] } @Test(expected = classOf[TableException]) @@ -90,7 +90,7 @@ class JoinValidationTest extends TableTestBase { val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and a > d" - util.tableEnv.sql(sqlQuery).toDataSet[Row] + util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row] } @Test(expected = classOf[TableException]) @@ -101,7 +101,7 @@ class JoinValidationTest extends TableTestBase { val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and a > d" - util.tableEnv.sql(sqlQuery).toDataSet[Row] + util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row] } @Test(expected = classOf[TableException]) @@ -112,7 +112,7 @@ class JoinValidationTest extends TableTestBase { val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and a > d" - util.tableEnv.sql(sqlQuery).toDataSet[Row] + util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row] } @Test(expected = classOf[TableException]) @@ -123,7 +123,7 @@ class JoinValidationTest extends TableTestBase { val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and e > 3" - util.tableEnv.sql(sqlQuery).toDataSet[Row] + util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row] } @Test(expected = classOf[TableException]) @@ -134,7 +134,7 @@ class JoinValidationTest extends TableTestBase { val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and b > 3" - util.tableEnv.sql(sqlQuery).toDataSet[Row] + util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row] } @Test(expected = classOf[TableException]) @@ -145,6 +145,6 @@ class JoinValidationTest extends TableTestBase { val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and b > 3" - util.tableEnv.sql(sqlQuery).toDataSet[Row] + util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row] } } http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala index 7e72a21..dfbdd5a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala @@ -41,7 +41,7 @@ class OverWindowValidationTest extends TableTestBase { util.addFunction("overAgg", new OverAgg0) val sqlQuery = "SELECT overAgg(b, a) FROM T" - util.tableEnv.sql(sqlQuery) + util.tableEnv.sqlQuery(sqlQuery) } /** @@ -55,6 +55,6 @@ class OverWindowValidationTest extends TableTestBase { val sqlQuery = "SELECT overAgg(b, a) FROM T GROUP BY TUMBLE(ts, INTERVAL '2' HOUR)" - util.tableEnv.sql(sqlQuery) + util.tableEnv.sqlQuery(sqlQuery) } }
