http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala index d3f9b9f..cfc8067 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala @@ -34,6 +34,6 @@ class SortValidationTest extends TableTestBase { val sqlQuery = "SELECT * FROM MyTable LIMIT 5" - util.tableEnv.sql(sqlQuery).toDataSet[Row] + util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row] } }
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala new file mode 100644 index 0000000..2cfe931 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.batch.table.validation + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.{Types, ValidationException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.{MemoryTableSinkUtil, TableTestBase} +import org.junit._ + +class InsertIntoValidationTest extends TableTestBase { + + @Test(expected = classOf[ValidationException]) + def testInconsistentLengthInsert(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c) + + val fieldNames = Array("d", "e") + val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG) + val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink + util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink) + + // must fail because TableSink accepts fewer fields. + util.tableEnv.scan("sourceTable") + .select('a, 'b, 'c) + .insertInto("targetTable") + } + + @Test(expected = classOf[ValidationException]) + def testUnmatchedTypesInsert(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c) + + val fieldNames = Array("d", "e", "f") + val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG) + val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink + util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink) + + // must fail because types of result and TableSink do not match. + util.tableEnv.scan("sourceTable") + .select('a, 'b, 'c) + .insertInto("targetTable") + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala index 0943ea6..1b99679 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala @@ -41,7 +41,7 @@ class StreamTableEnvironmentTest extends TableTestBase { val util = streamTestUtil() val table = util.addTable[(Long, Int, String)]("tableName", 'a, 'b, 'c) - val sqlTable = util.tableEnv.sql(s"SELECT a, b, c FROM $table WHERE b > 12") + val sqlTable = util.tableEnv.sqlQuery(s"SELECT a, b, c FROM $table WHERE b > 12") val expected = unaryNode( "DataStreamCalc", @@ -53,7 +53,7 @@ class StreamTableEnvironmentTest extends TableTestBase { val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f) - val sqlTable2 = util.tableEnv.sql(s"SELECT d, e, f FROM $table2 " + + val sqlTable2 = util.tableEnv.sqlQuery(s"SELECT d, e, f FROM $table2 " + s"UNION ALL SELECT a, b, c FROM $table") val expected2 = binaryNode( http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala index 640fd26..e066fe4 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala @@ -206,7 +206,7 @@ class JoinTest extends TableTestBase { val query = "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql - val resultTable = streamUtil.tableEnv.sql(query) + val resultTable = streamUtil.tableEnv.sqlQuery(query) val relNode = resultTable.getRelNode val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin] val rexNode = joinNode.getCondition @@ -230,7 +230,7 @@ class JoinTest extends TableTestBase { query: String, expectCondStr: String): Unit = { - val resultTable = streamUtil.tableEnv.sql(query) + val resultTable = streamUtil.tableEnv.sqlQuery(query) val relNode = resultTable.getRelNode val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin] val joinInfo = joinNode.analyzeCondition http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala new file mode 100644 index 0000000..3045100 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql.validation + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{TableEnvironment, Types, ValidationException} +import org.apache.flink.table.runtime.utils.StreamTestData +import org.apache.flink.table.utils.MemoryTableSinkUtil +import org.junit.Test + +class InsertIntoValidationTest { + + @Test(expected = classOf[ValidationException]) + def testInconsistentLengthInsert(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("sourceTable", t) + + val fieldNames = Array("d", "e") + val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG) + val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink + tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink) + + val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" + + // must fail because table sink has too few fields. + tEnv.sqlUpdate(sql) + } + + @Test(expected = classOf[ValidationException]) + def testUnmatchedTypesInsert(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("sourceTable", t) + + val fieldNames = Array("d", "e", "f") + val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG) + val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink + tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink) + + val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" + + // must fail because field types of table sink are incompatible. + tEnv.sqlUpdate(sql) + } + + @Test(expected = classOf[ValidationException]) + def testUnsupportedPartialInsert(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("sourceTable", t) + + val fieldNames = Array("d", "e", "f") + val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes + val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink + tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink) + + val sql = "INSERT INTO targetTable (d, f) SELECT a, c FROM sourceTable" + + // must fail because we don't support partial insert yet. + tEnv.sqlUpdate(sql) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala index 413cca7..d04b6d0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala @@ -43,7 +43,7 @@ class OverWindowValidationTest extends TableTestBase { "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) " + "from T1" - streamUtil.tableEnv.sql(sqlQuery).toAppendStream[Row] + streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row] } /** @@ -55,7 +55,7 @@ class OverWindowValidationTest extends TableTestBase { val sqlQuery = "SELECT overAgg(c, a) FROM MyTable" - streamUtil.tableEnv.sql(sqlQuery) + streamUtil.tableEnv.sqlQuery(sqlQuery) } /** @@ -66,6 +66,6 @@ class OverWindowValidationTest extends TableTestBase { streamUtil.addFunction("overAgg", new OverAgg0) val sqlQuery = "SELECT overAgg(c, a) FROM MyTable" - streamUtil.tableEnv.sql(sqlQuery) + streamUtil.tableEnv.sqlQuery(sqlQuery) } } http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala index dbc7d46..f58feed 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala @@ -145,7 +145,7 @@ class CorrelateValidationTest extends TableTestBase { ), "Undefined function: NONEXIST") // SQL API call expectExceptionThrown( - util.tableEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"), + util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"), "No match found for function signature nonexist(<NUMERIC>)") @@ -160,7 +160,7 @@ class CorrelateValidationTest extends TableTestBase { // SQL API call // NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug expectExceptionThrown( - util.tableEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"), + util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"), null, classOf[AssertionError]) @@ -172,7 +172,7 @@ class CorrelateValidationTest extends TableTestBase { "Given parameters of function 'FUNC2' do not match any signature") // SQL API call expectExceptionThrown( - util.tableEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"), + util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"), "No match found for function signature func2(<CHARACTER>, <CHARACTER>)") } http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala new file mode 100644 index 0000000..2fcfd6c --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.table.validation + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{TableEnvironment, Types, ValidationException} +import org.apache.flink.table.runtime.utils.StreamTestData +import org.apache.flink.table.utils.MemoryTableSinkUtil +import org.junit.Test + +class InsertIntoValidationTest { + + @Test(expected = classOf[ValidationException]) + def testInconsistentLengthInsert(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("sourceTable", t) + + val fieldNames = Array("d", "f") + val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG) + val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink + tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink) + + // must fail because table sink has too few fields. + tEnv.scan("sourceTable") + .select('a, 'b, 'c) + .insertInto("targetTable") + } + + @Test(expected = classOf[ValidationException]) + def testUnmatchedTypesInsert(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("sourceTable", t) + + val fieldNames = Array("d", "e", "f") + val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG) + val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink + tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink) + + // must fail because field types of table sink are incompatible. + tEnv.scan("sourceTable") + .select('a, 'b, 'c) + .insertInto("targetTable") + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala index ab87cd3..628925b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala @@ -18,10 +18,12 @@ package org.apache.flink.table.api.validation +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ -import org.apache.flink.table.api.TableException +import org.apache.flink.table.api.{TableException, Types} import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.stream.table.TestAppendSink +import org.apache.flink.table.utils.MemoryTableSinkUtil.UnsafeMemoryAppendTableSink import org.apache.flink.table.utils.TableTestBase import org.junit.Test @@ -39,4 +41,27 @@ class TableSinksValidationTest extends TableTestBase { .writeToSink(new TestAppendSink) } + @Test(expected = classOf[TableException]) + def testSinkTableRegistrationUsingExistedTableName(): Unit = { + val util = streamTestUtil() + util.addTable[(Int, String)]("TargetTable", 'id, 'text) + + val fieldNames = Array("a", "b", "c") + val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG) + // table name already registered + util.tableEnv + .registerTableSink("TargetTable", fieldNames, fieldTypes, new UnsafeMemoryAppendTableSink) + } + + @Test(expected = classOf[TableException]) + def testRegistrationWithInconsistentFieldNamesAndTypesLength(): Unit = { + val util = streamTestUtil() + + // inconsistent length of field names and types + val fieldNames = Array("a", "b", "c") + val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG) + + util.tableEnv + .registerTableSink("TargetTable", fieldNames, fieldTypes, new UnsafeMemoryAppendTableSink) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala index 44842f7..dd6e00e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala @@ -23,7 +23,6 @@ import java.util.concurrent.Future import com.google.common.collect.ImmutableList import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder} -import org.apache.calcite.rel.logical.LogicalTableScan import org.apache.calcite.rex.RexNode import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql2rel.RelDecorrelator http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala index a15f1d1..b4ad9ca 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala @@ -430,7 +430,7 @@ class ExpressionReductionRulesTest extends TableTestBase { util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) - val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable") + val newTable = util.tableEnv.sqlQuery("SELECT 1 + 1 + a AS a FROM MyTable") util.tableEnv.registerTable("NewTable", newTable) @@ -448,7 +448,7 @@ class ExpressionReductionRulesTest extends TableTestBase { util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) - val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable") + val newTable = util.tableEnv.sqlQuery("SELECT 1 + 1 + a AS a FROM MyTable") util.tableEnv.registerTable("NewTable", newTable) http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala index 535bbf5..999a808 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala @@ -286,7 +286,7 @@ class RetractionRulesTest extends TableTestBase { class StreamTableTestForRetractionUtil extends StreamTableTestUtil { def verifySqlTrait(query: String, expected: String): Unit = { - verifyTableTrait(tableEnv.sql(query), expected) + verifyTableTrait(tableEnv.sqlQuery(query), expected) } def verifyTableTrait(resultTable: Table, expected: String): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala index ab80c65..cfff326 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala @@ -275,7 +275,7 @@ class TimeIndicatorConversionTest extends TableTestBase { val util = streamTestUtil() util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime) - val result = util.tableEnv.sql("SELECT COUNT(long) FROM MyTable GROUP BY proctime") + val result = util.tableEnv.sqlQuery("SELECT COUNT(long) FROM MyTable GROUP BY proctime") val expected = unaryNode( "DataStreamCalc", @@ -300,7 +300,7 @@ class TimeIndicatorConversionTest extends TableTestBase { val util = streamTestUtil() util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime) - val result = util.tableEnv.sql("SELECT MIN(proctime) FROM MyTable GROUP BY long") + val result = util.tableEnv.sqlQuery("SELECT MIN(proctime) FROM MyTable GROUP BY long") val expected = unaryNode( "DataStreamCalc", @@ -325,7 +325,7 @@ class TimeIndicatorConversionTest extends TableTestBase { val util = streamTestUtil() util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 'long, 'int) - val result = util.tableEnv.sql( + val result = util.tableEnv.sqlQuery( "SELECT TUMBLE_END(rowtime, INTERVAL '0.1' SECOND) AS `rowtime`, `long`, " + "SUM(`int`) FROM MyTable " + "GROUP BY `long`, TUMBLE(rowtime, INTERVAL '0.1' SECOND)") @@ -355,7 +355,7 @@ class TimeIndicatorConversionTest extends TableTestBase { val util = streamTestUtil() util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 'long, 'int) - val result = util.tableEnv.sql("SELECT MIN(rowtime), long FROM MyTable " + + val result = util.tableEnv.sqlQuery("SELECT MIN(rowtime), long FROM MyTable " + "GROUP BY long, TUMBLE(rowtime, INTERVAL '0.1' SECOND)") val expected = unaryNode( http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala index 39b8371..465a88c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala @@ -52,7 +52,7 @@ class AggregateITCase( val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "231,1,21,21,11" val results = result.toDataSet[Row].collect() @@ -70,7 +70,7 @@ class AggregateITCase( val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "231" val results = result.toDataSet[Row].collect() @@ -88,7 +88,7 @@ class AggregateITCase( val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "231" val results = result.toDataSet[Row].collect() @@ -109,7 +109,7 @@ class AggregateITCase( (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'f, 'g) tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "1,1,1,1,1.5,1.5,2,Ciao,Ciao,Hello,Ciao,3.0" val results = result.toDataSet[Row].collect() @@ -128,7 +128,7 @@ class AggregateITCase( val ds = env.fromElements((1: Byte, 1: Short), (2: Byte, 2: Short)).toTable(tEnv, 'a, 'b) tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "1,3,2,1,3" val results = result.toDataSet[Row].collect() @@ -147,7 +147,7 @@ class AggregateITCase( val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv, 'a, 'b) tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "5.5,7" val results = result.toDataSet[Row].collect() @@ -165,7 +165,7 @@ class AggregateITCase( val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv) tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "2,2" val results = result.toDataSet[Row].collect() @@ -187,7 +187,7 @@ class AggregateITCase( (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv) tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "1,3,2" val results = result.toDataSet[Row].collect() @@ -205,7 +205,7 @@ class AggregateITCase( val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "231,21" val results = result.toDataSet[Row].collect() @@ -223,7 +223,7 @@ class AggregateITCase( val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "6,18,6\n5,13,5\n4,8,4\n3,5,3\n2,2,2\n1,1,1" @@ -243,7 +243,7 @@ class AggregateITCase( val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds) - val result = tEnv.sql(sqlQuery).toDataSet[Row].collect() + val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() val expected = "6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" + @@ -280,9 +280,9 @@ class AggregateITCase( tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) - val result2 = tEnv.sql(sqlQuery2) - val result3 = tEnv.sql(sqlQuery3) + val result = tEnv.sqlQuery(sqlQuery) + val result2 = tEnv.sqlQuery(sqlQuery2) + val result3 = tEnv.sqlQuery(sqlQuery3) val results = result.toDataSet[Row].collect() val expected = Seq.empty @@ -315,7 +315,7 @@ class AggregateITCase( .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000))) tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts) - val result = tEnv.sql(sqlQuery).toDataSet[Row].collect() + val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() val expected = Seq( "1,1,1,1,1", "2,2,1,2,2", "2,3,1,2,3", @@ -348,7 +348,7 @@ class AggregateITCase( .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000))) tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts) - val result = tEnv.sql(sqlQuery).toDataSet[Row].collect() + val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() val expected = Seq( "1,1,1,1,1","1,1,1,1,1", "2,5,2,2,2","2,5,2,2,2", @@ -383,7 +383,7 @@ class AggregateITCase( .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000))) tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts) - val result = tEnv.sql(sqlQuery).toDataSet[Row].collect() + val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() val expected = Seq( "2,10,39,6,3,7", "16,21,111,6,6,18" http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala index 711182c..b891a7d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala @@ -53,7 +53,7 @@ class CalcITCase( val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + @@ -77,7 +77,7 @@ class CalcITCase( val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + @@ -101,7 +101,7 @@ class CalcITCase( val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + @@ -125,7 +125,7 @@ class CalcITCase( val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + @@ -146,7 +146,7 @@ class CalcITCase( val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", ds) - tEnv.sql(sqlQuery) + tEnv.sqlQuery(sqlQuery) } @Test @@ -160,7 +160,7 @@ class CalcITCase( val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "\n" val results = result.toDataSet[Row].collect() @@ -178,7 +178,7 @@ class CalcITCase( val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + @@ -201,7 +201,7 @@ class CalcITCase( val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n" val results = result.toDataSet[Row].collect() @@ -219,7 +219,7 @@ class CalcITCase( val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" + @@ -240,7 +240,7 @@ class CalcITCase( val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "1,1,Hi\n" + "21,6,Comment#15\n" val results = result.toDataSet[Row].collect() @@ -258,7 +258,7 @@ class CalcITCase( val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" + "9,4,Comment#3\n" + "17,6,Comment#11\n" + @@ -281,7 +281,7 @@ class CalcITCase( Timestamp.valueOf("1984-07-12 14:34:24"))) tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "1984-07-12,14:34:24,1984-07-12 14:34:24.0," + "1984-07-12,14:34:24,1984-07-12 14:34:24.0" @@ -300,7 +300,7 @@ class CalcITCase( val ds = env.fromElements("a", "b", "c") tEnv.registerDataSet("MyTable", ds, 'text) - val result = tEnv.sql("SELECT hashCode(text) FROM MyTable") + val result = tEnv.sqlQuery("SELECT hashCode(text) FROM MyTable") val expected = "97\n98\n99" val results = result.toDataSet[Row].collect() http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala index 681b4b5..6a17cb4 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala @@ -49,7 +49,7 @@ class JoinITCase( tEnv.registerTable("Table3", ds1) tEnv.registerTable("Table5", ds2) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" val results = result.toDataSet[Row].collect() @@ -69,7 +69,7 @@ class JoinITCase( tEnv.registerTable("Table3", ds1) tEnv.registerTable("Table5", ds2) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "Hi,Hallo\n" val results = result.toDataSet[Row].collect() @@ -89,7 +89,7 @@ class JoinITCase( tEnv.registerTable("Table3", ds1) tEnv.registerTable("Table5", ds2) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "Hello world, how are you?,Hallo Welt wie\n" + "I am fine.,Hallo Welt wie\n" val results = result.toDataSet[Row].collect() @@ -109,7 +109,7 @@ class JoinITCase( tEnv.registerTable("Table3", ds1) tEnv.registerTable("Table5", ds2) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" + "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n" @@ -130,7 +130,7 @@ class JoinITCase( tEnv.registerTable("Table3", ds1) tEnv.registerTable("Table5", ds2) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "1,Hi\n" + "2,Hello\n" + "1,Hello\n" + "2,Hello world\n" + "2,Hello world\n" + "3,Hello world\n" val results = result.toDataSet[Row].collect() @@ -150,7 +150,7 @@ class JoinITCase( tEnv.registerDataSet("Table3", ds1, 'a, 'b, 'c) tEnv.registerDataSet("Table5", ds2, 'd, 'e, 'f, 'g, 'h) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "6,6" val results = result.toDataSet[Row].collect() @@ -170,7 +170,7 @@ class JoinITCase( tEnv.registerTable("Table3", ds1) tEnv.registerTable("Table5", ds2) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "6,6" val results = result.toDataSet[Row].collect() @@ -196,7 +196,7 @@ class JoinITCase( "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" + "null,IJK\n" + "null,JKL\n" + "null,KLM" - val results = tEnv.sql(sqlQuery).toDataSet[Row].collect() + val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @@ -218,7 +218,7 @@ class JoinITCase( "null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" + "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" + "null,IJK\n" + "null,JKL\n" + "null,KLM" - val results = tEnv.sql(sqlQuery).toDataSet[Row].collect() + val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @@ -240,7 +240,7 @@ class JoinITCase( "null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" + "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" + "null,IJK\n" + "null,JKL\n" + "null,KLM" - val results = tEnv.sql(sqlQuery).toDataSet[Row].collect() + val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @@ -257,7 +257,7 @@ class JoinITCase( "3,1,1,Hi\n" + "3,2,2,Hello\n" + "3,3,2,Hello world" - val result = tEnv.sql(sqlQuery2).collect() + val result = tEnv.sqlQuery(sqlQuery2).collect() TestBaseUtils.compareResultAsText(result.asJava, expected) } @@ -274,7 +274,7 @@ class JoinITCase( "1,1,Hi,3\n" + "2,2,Hello,3\n" + "3,2,Hello world,3" - val result = tEnv.sql(sqlQuery1).collect() + val result = tEnv.sqlQuery(sqlQuery1).collect() TestBaseUtils.compareResultAsText(result.asJava, expected) } @@ -287,7 +287,7 @@ class JoinITCase( tEnv.registerTable("A", table) val sqlQuery1 = "SELECT * FROM A CROSS JOIN (SELECT count(*) FROM A HAVING count(*) < 0)" - val result = tEnv.sql(sqlQuery1).count() + val result = tEnv.sqlQuery(sqlQuery1).count() Assert.assertEquals(0, result) } @@ -305,7 +305,7 @@ class JoinITCase( tEnv.registerTable("B", ds2) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = Seq( "1,null", "2,null", "2,null", @@ -331,7 +331,7 @@ class JoinITCase( tEnv.registerTable("A", ds1) tEnv.registerTable("B", ds2) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = Seq( "1,null", "2,null", "2,null", "3,3", "3,3", "3,3", "4,null", "4,null", "4,null", @@ -355,7 +355,7 @@ class JoinITCase( tEnv.registerTable("A", ds1) tEnv.registerTable("B", ds2) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = Seq( "1,3", "2,3", "2,3", "3,null", "3,null", "3,null", "4,null", "4,null", "4,null", @@ -380,7 +380,7 @@ class JoinITCase( tEnv.registerTable("A", ds2) tEnv.registerTable("B", ds1) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = Seq( "2,null", "3,null", "1,null").mkString("\n") @@ -402,7 +402,7 @@ class JoinITCase( tEnv.registerTable("A", ds1) tEnv.registerTable("B", ds2) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = Seq( "1,null", "2,null", "2,null", "3,3", "3,3", @@ -427,7 +427,7 @@ class JoinITCase( tEnv.registerTable("A", ds1) tEnv.registerTable("B", ds2) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = Seq( "1,null", "2,null", "2,null", "3,null", "3,null", @@ -453,7 +453,7 @@ class JoinITCase( tEnv.registerTable("t1", ds1) tEnv.registerTable("t2", ds2) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = Seq( "1,null,null", "2,null,null", "2,null,null", @@ -481,7 +481,7 @@ class JoinITCase( val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) as A (s)" - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = List("1,Hi", "1,w", "2,Hello", "2,k", "3,Hello world", "3,x") val results = result.toDataSet[Row].collect().toList @@ -508,7 +508,7 @@ class JoinITCase( " UNNEST(tf.b) as A (x, y) " + "WHERE x > a" - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = List( "1,[(12,45.6), (2,45.612)],12,45.6", http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala index b0e6fe8..d965e0c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala @@ -52,7 +52,7 @@ class SetOperatorsITCase( tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) tEnv.registerDataSet("t2", ds2, 'd, 'e, 'f) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n" val results = result.toDataSet[Row].collect() @@ -72,7 +72,7 @@ class SetOperatorsITCase( tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) tEnv.registerDataSet("t2", ds2, 'd, 'e, 'f) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "Hi\n" + "Hello\n" + "Hello world\n" val results = result.toDataSet[Row].collect() @@ -94,7 +94,7 @@ class SetOperatorsITCase( tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "Hi\n" + "Hallo\n" val results = result.toDataSet[Row].collect() @@ -115,7 +115,7 @@ class SetOperatorsITCase( tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "18" val results = result.toDataSet[Row].collect() @@ -135,7 +135,7 @@ class SetOperatorsITCase( tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "Hello\n" + "Hello world\n" val results = result.toDataSet[Row].collect() @@ -161,7 +161,7 @@ class SetOperatorsITCase( tEnv.registerDataSet("t1", ds1, 'c) tEnv.registerDataSet("t2", ds2, 'c) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "1\n1" val results = result.toDataSet[Row].collect() @@ -183,7 +183,7 @@ class SetOperatorsITCase( tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "Hi\n" val results = result.toDataSet[Row].collect() @@ -208,7 +208,7 @@ class SetOperatorsITCase( tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "Hi\n" + "Hello\n" val results = result.toDataSet[Row].collect() @@ -234,7 +234,7 @@ class SetOperatorsITCase( tEnv.registerDataSet("t1", ds1, 'c) tEnv.registerDataSet("t2", ds2, 'c) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "1\n2\n2" val results = result.toDataSet[Row].collect() @@ -254,7 +254,7 @@ class SetOperatorsITCase( tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "Hello\n" + "Hello world\n" val results = result.toDataSet[Row].collect() @@ -271,7 +271,7 @@ class SetOperatorsITCase( tEnv.registerTable("Table3", ds1) tEnv.registerTable("Table5", ds2) - val result = tEnv.sql("SELECT d FROM Table5 WHERE d IN (SELECT a FROM Table3)") + val result = tEnv.sqlQuery("SELECT d FROM Table5 WHERE d IN (SELECT a FROM Table3)") val expected = Seq("1", "2", "2", "3", "3", "3").mkString("\n") val results = result.toDataSet[Row].collect() @@ -288,7 +288,7 @@ class SetOperatorsITCase( tEnv.registerTable("Table3", ds1) tEnv.registerTable("Table5", ds2) - val result = tEnv.sql("SELECT d IN (SELECT a FROM Table3) FROM Table5") + val result = tEnv.sqlQuery("SELECT d IN (SELECT a FROM Table3) FROM Table5") val expected = Seq("false", "false", "false", "false", "false", "false", "false", "false", "false", "true", "true", "true", "true", "true", "true").mkString("\n") http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala index 4672ec3..66943fc 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala @@ -62,7 +62,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode) val expected = sortExpectedly(tupleDataSetStrings) // squash all rows inside a partition into one element - val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => { + val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => { // the rows need to be copied in object reuse mode val copied = new mutable.ArrayBuffer[Row] rows.foreach(r => copied += Row.copy(r)) @@ -99,7 +99,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode) val expected = sortExpectedly(tupleDataSetStrings, 2, 21) // squash all rows inside a partition into one element - val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => { + val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => { // the rows need to be copied in object reuse mode val copied = new mutable.ArrayBuffer[Row] rows.foreach(r => copied += Row.copy(r)) @@ -130,7 +130,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode) val expected = sortExpectedly(tupleDataSetStrings, 2, 7) // squash all rows inside a partition into one element - val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => { + val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => { // the rows need to be copied in object reuse mode val copied = new mutable.ArrayBuffer[Row] rows.foreach(r => copied += Row.copy(r)) @@ -161,7 +161,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode) val expected = sortExpectedly(tupleDataSetStrings, 0, 5) // squash all rows inside a partition into one element - val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => { + val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => { // the rows need to be copied in object reuse mode val copied = new mutable.ArrayBuffer[Row] rows.foreach(r => copied += Row.copy(r)) http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala index b7f1bb1..be8278f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala @@ -24,8 +24,10 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.table.utils.MemoryTableSinkUtil import org.apache.flink.test.util.TestBaseUtils import org.apache.flink.types.Row +import org.junit.Assert.assertEquals import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -48,7 +50,7 @@ class TableEnvironmentITCase( val sqlQuery = "SELECT * FROM MyTable WHERE a > 9" - val result = tEnv.sql(sqlQuery).select('a.avg, 'b.sum, 'c.count) + val result = tEnv.sqlQuery(sqlQuery).select('a.avg, 'b.sum, 'c.count) val expected = "15,65,12" val results = result.toDataSet[Row].collect() @@ -68,7 +70,7 @@ class TableEnvironmentITCase( val sqlQuery = "SELECT avg(a) as a1, sum(b) as b1, count(c) as c1 FROM MyTable" - val result = tEnv.sql(sqlQuery).select('a1 + 1, 'b1 - 5, 'c1) + val result = tEnv.sqlQuery(sqlQuery).select('a1 + 1, 'b1 - 5, 'c1) val expected = "16,60,12" val results = result.toDataSet[Row].collect() @@ -85,11 +87,11 @@ class TableEnvironmentITCase( tEnv.registerTable("MyTable", t) val sqlQuery = "SELECT a as aa FROM MyTable WHERE b = 6" - val result1 = tEnv.sql(sqlQuery) + val result1 = tEnv.sqlQuery(sqlQuery) tEnv.registerTable("ResTable", result1) val sqlQuery2 = "SELECT count(aa) FROM ResTable" - val result2 = tEnv.sql(sqlQuery2) + val result2 = tEnv.sqlQuery(sqlQuery2) val expected = "6" val results = result2.toDataSet[Row].collect() @@ -106,11 +108,34 @@ class TableEnvironmentITCase( val ds = env.fromElements(((12, true), "Hello")).toTable(tEnv).as('a1, 'a2) tEnv.registerTable("MyTable", ds) - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "Hello,true\n" val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } + + @Test + def testInsertIntoMemoryTable(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + MemoryTableSinkUtil.clear + + val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("sourceTable", t) + + val fieldNames = Array("d", "e", "f") + val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes + val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink + tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink) + + val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" + tEnv.sqlUpdate(sql) + env.execute() + + val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world") + assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted) + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala index 504ab90..187d096 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala @@ -44,7 +44,7 @@ class TableSourceITCase( val tEnv = TableEnvironment.getTableEnvironment(env, config) tEnv.registerTableSource("csvTable", csvTable) - val results = tEnv.sql( + val results = tEnv.sqlQuery( "SELECT id, `first`, `last`, score FROM csvTable").collect() val expected = Seq( @@ -67,7 +67,7 @@ class TableSourceITCase( tableEnv.registerTableSource("NestedPersons", nestedTable) - val result = tableEnv.sql("SELECT NestedPersons.firstName, NestedPersons.lastName," + + val result = tableEnv.sqlQuery("SELECT NestedPersons.firstName, NestedPersons.lastName," + "NestedPersons.address.street, NestedPersons.address.city AS city " + "FROM NestedPersons " + "WHERE NestedPersons.address.city LIKE 'Dublin'").collect() http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala index 116f690..e947c3f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala @@ -446,7 +446,7 @@ class CalcITCase( val sqlQuery = "SELECT c FROM t1 where RichFunc2(c)='ABC#Hello'" - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "Hello" val results = result.toDataSet[Row].collect() @@ -467,7 +467,7 @@ class CalcITCase( val sqlQuery = "SELECT c FROM t1 where RichFunc3(c)=true" - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "Hello" val results = result.toDataSet[Row].collect() @@ -488,7 +488,7 @@ class CalcITCase( val sqlQuery = "SELECT c FROM t1 where " + "RichFunc2(c)='Abc#Hello' or RichFunc1(a)=3 and b=2" - val result = tEnv.sql(sqlQuery) + val result = tEnv.sqlQuery(sqlQuery) val expected = "Hello\nHello world" val results = result.toDataSet[Row].collect() http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala index 2e23161..725c580 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala @@ -26,8 +26,10 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase} +import org.apache.flink.table.utils.MemoryTableSinkUtil import org.apache.flink.test.util.TestBaseUtils import org.apache.flink.types.Row +import org.junit.Assert.assertEquals import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -162,6 +164,28 @@ class TableEnvironmentITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testInsertIntoMemoryTable(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + MemoryTableSinkUtil.clear + + val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("sourceTable", t) + + val fieldNames = Array("d", "e", "f") + val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes + val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink + tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink) + + tEnv.scan("sourceTable") + .select('a, 'b, 'c) + .insertInto("targetTable") + env.execute() + + val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world") + assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted) + } } object TableEnvironmentITCase { http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala index 24d8695..ec65cf7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala @@ -229,7 +229,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) tEnv.registerTable("MyTable", table) - val t = tEnv.sql("SELECT COUNT(`rowtime`) FROM MyTable " + + val t = tEnv.sqlQuery("SELECT COUNT(`rowtime`) FROM MyTable " + "GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)") val results = t.toAppendStream[Row] @@ -292,7 +292,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { tEnv.registerTable("T1", table) val querySql = "select rowtime as ts, string as msg from T1" - val results = tEnv.sql(querySql).toAppendStream[Pojo1] + val results = tEnv.sqlQuery(querySql).toAppendStream[Pojo1] results.addSink(new StreamITCase.StringSink[Pojo1]) env.execute() http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala index ab7925b..e40da7a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala @@ -60,7 +60,7 @@ class JoinITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) tEnv.registerTable("T2", t2) - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() } @@ -97,7 +97,7 @@ class JoinITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) tEnv.registerTable("T2", t2) - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() } http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala index cc47a69..4884513 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala @@ -68,7 +68,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " + "FROM MyTable" - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -110,7 +110,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " MIN(c) OVER (" + " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) " + "FROM MyTable" - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -153,7 +153,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) " + "from T1" - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -181,7 +181,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " + "as cnt1 from T1)" - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -215,7 +215,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " + "from T1" - val result = tEnv.sql(sqlQuery).toAppendStream[Row](queryConfig) + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row](queryConfig) result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -240,7 +240,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " + "from T1" - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -302,7 +302,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" + " FROM T1" - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -363,7 +363,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " + "FROM T1" - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -431,7 +431,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " + " FROM T1" - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -492,7 +492,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " + "FROM T1" - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -553,7 +553,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -619,7 +619,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -681,7 +681,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -742,7 +742,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -814,7 +814,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala index 2c59f8c..19db2a0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala @@ -87,7 +87,7 @@ class SortITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT b FROM T1 ORDER BY rowtime, b ASC " - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StringRowSelectorSink(0)).setParallelism(1) env.execute() http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala index 5398c6d..2c82d9c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala @@ -18,16 +18,21 @@ package org.apache.flink.table.runtime.stream.sql +import java.util + import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.{TableEnvironment, Types} import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} import org.apache.flink.types.Row +import org.apache.flink.table.utils.MemoryTableSinkUtil + +import scala.collection.JavaConverters._ import org.junit.Assert._ import org.junit._ @@ -58,7 +63,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t = ds.toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTableRow", t) - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -79,7 +84,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", t) - val result = tEnv.sql(sqlQuery).toRetractStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row] result.addSink(new StreamITCase.RetractingSink).setParallelism(1) env.execute() @@ -100,7 +105,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", t) - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -121,7 +126,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", t) - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -142,7 +147,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t = StreamTestData.getSmall3TupleDataStream(env) tEnv.registerDataStream("MyTable", t) - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -166,7 +171,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("T2", t2) - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -193,7 +198,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("T2", t2) - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -219,7 +224,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t2 = StreamTestData.get3TupleDataStream(env) tEnv.registerDataStream("T2", t2, 'a, 'b, 'c) - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -244,7 +249,7 @@ class SqlITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)" - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -276,7 +281,7 @@ class SqlITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)" - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -306,7 +311,7 @@ class SqlITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13" - val result = tEnv.sql(sqlQuery).toAppendStream[Row] + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() @@ -365,5 +370,33 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } -} + @Test + def testInsertIntoMemoryTable(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + MemoryTableSinkUtil.clear + val t = StreamTestData.getSmall3TupleDataStream(env) + .assignAscendingTimestamps(x => x._2) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) + tEnv.registerTable("sourceTable", t) + + val fieldNames = Array("d", "e", "f", "t") + val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP) + .asInstanceOf[Array[TypeInformation[_]]] + val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink + tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink) + + val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable" + tEnv.sqlUpdate(sql) + env.execute() + + val expected = List( + "1,1,Hi,1970-01-01 00:00:00.001", + "2,2,Hello,1970-01-01 00:00:00.002", + "3,2,Hello world,1970-01-01 00:00:00.002") + assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala index be876a8..30ada56 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala @@ -43,7 +43,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { tEnv.registerTableSource("persons", csvTable) - tEnv.sql( + tEnv.sqlQuery( "SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ") .toAppendStream[Row] .addSink(new StreamITCase.StringSink[Row]) http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala index 830359f..c5b82fe 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala @@ -20,7 +20,6 @@ package org.apache.flink.table.runtime.stream.table import java.io.File import java.lang.{Boolean => JBool} -import java.sql.Timestamp import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.TypeInformation @@ -37,6 +36,7 @@ import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{TableEnvironment, TableException, Types} import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData} import org.apache.flink.table.sinks._ +import org.apache.flink.table.utils.MemoryTableSinkUtil import org.apache.flink.test.util.TestBaseUtils import org.apache.flink.types.Row import org.apache.flink.util.Collector @@ -49,6 +49,36 @@ import scala.collection.JavaConverters._ class TableSinkITCase extends StreamingMultipleProgramsTestBase { @Test + def testInsertIntoRegisteredTableSink(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + + val tEnv = TableEnvironment.getTableEnvironment(env) + MemoryTableSinkUtil.clear + + val input = StreamTestData.get3TupleDataStream(env) + .assignAscendingTimestamps(r => r._2) + val fieldNames = Array("d", "e", "t") + val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.SQL_TIMESTAMP, Types.LONG) + val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink + tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink) + + val results = input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime) + .where('a < 3 || 'a > 19) + .select('c, 't, 'b) + .insertInto("targetTable") + env.execute() + + val expected = Seq( + "Hi,1970-01-01 00:00:00.001,1", + "Hello,1970-01-01 00:00:00.002,2", + "Comment#14,1970-01-01 00:00:00.006,6", + "Comment#15,1970-01-01 00:00:00.006,6").mkString("\n") + + TestBaseUtils.compareResultAsText(MemoryTableSinkUtil.results.asJava, expected) + } + + @Test def testStreamTableSink(): Unit = { val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp") http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala new file mode 100644 index 0000000..29b2e94 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils + +import org.apache.flink.api.common.io.RichOutputFormat +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction +import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, TableSinkBase} +import org.apache.flink.types.Row + +import scala.collection.mutable + +object MemoryTableSinkUtil { + var results: mutable.MutableList[String] = mutable.MutableList.empty[String] + + def clear = { + MemoryTableSinkUtil.results.clear() + } + + final class UnsafeMemoryAppendTableSink + extends TableSinkBase[Row] with BatchTableSink[Row] + with AppendStreamTableSink[Row] { + + override def getOutputType: TypeInformation[Row] = { + new RowTypeInfo(getFieldTypes, getFieldNames) + } + + override protected def copy: TableSinkBase[Row] = { + new UnsafeMemoryAppendTableSink + } + + override def emitDataSet(dataSet: DataSet[Row]): Unit = { + dataSet.output(new MemoryCollectionOutputFormat) + } + + override def emitDataStream(dataStream: DataStream[Row]): Unit = { + dataStream.addSink(new MemoryAppendSink) + } + } + + private class MemoryAppendSink extends RichSinkFunction[Row]() { + + override def invoke(value: Row): Unit = { + results.synchronized { + results += value.toString + } + } + } + + private class MemoryCollectionOutputFormat extends RichOutputFormat[Row] { + + override def configure(parameters: Configuration): Unit = {} + + override def open(taskNumber: Int, numTasks: Int): Unit = {} + + override def writeRecord(record: Row): Unit = { + results.synchronized { + results += record.toString + } + } + + override def close(): Unit = {} + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala index c4e2433..ff7c79d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.utils import org.apache.calcite.tools.RuleSet +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.{QueryConfig, Table, TableConfig, TableEnvironment} import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.TableSource @@ -40,4 +41,9 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) { override protected def getBuiltInPhysicalOptRuleSet: RuleSet = ??? + override def registerTableSink( + name: String, + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]], + tableSink: TableSink[_]): Unit = ??? }
