This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fa31a14 [FLINK-11994] [table-planner-blink] Introduce TableImpl and
remove Table in flink-table-planner-blink
fa31a14 is described below
commit fa31a145b9167d1f237bd495e959fa54ea595ca1
Author: Jark Wu <[email protected]>
AuthorDate: Fri Mar 22 12:15:37 2019 +0800
[FLINK-11994] [table-planner-blink] Introduce TableImpl and remove Table in
flink-table-planner-blink
This closes #8032
---
.../flink/table/api/BatchTableEnvironment.scala | 2 +-
.../flink/table/api/StreamTableEnvironment.scala | 2 +-
.../scala/org/apache/flink/table/api/Table.scala | 41 -----
.../apache/flink/table/api/TableEnvironment.scala | 8 +-
.../org/apache/flink/table/api/TableImpl.scala | 166 +++++++++++++++++++++
.../flink/table/api/TableEnvironmentTest.scala | 4 +-
.../table/plan/util/FlinkRelOptUtilTest.scala | 5 +-
.../apache/flink/table/util/TableTestBase.scala | 15 +-
8 files changed, 184 insertions(+), 59 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index db35650..95838cf 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -95,7 +95,7 @@ class BatchTableEnvironment(
* @param extended Flag to include detailed optimizer estimates.
*/
def explain(table: Table, extended: Boolean): String = {
- val ast = table.getRelNode
+ val ast = table.asInstanceOf[TableImpl].getRelNode
val optimizedNode = optimize(ast)
val explainLevel = if (extended) {
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 36b9da6..f285214 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -107,7 +107,7 @@ abstract class StreamTableEnvironment(
* @param extended Flag to include detailed optimizer estimates.
*/
def explain(table: Table, extended: Boolean): String = {
- val ast = table.getRelNode
+ val ast = table.asInstanceOf[TableImpl].getRelNode
val optimizedNode = optimize(ast)
val explainLevel = if (extended) {
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/Table.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/Table.scala
deleted file mode 100644
index 8be880f..0000000
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/Table.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api
-
-import org.apache.calcite.rel.RelNode
-
-/**
- * A Table is the core component of the Table API.
- * Similar to how the batch and streaming APIs have DataSet and DataStream,
- * the Table API is built around [[Table]].
- *
- * NOTE: This class is only a placeholder to support end-to-end tests for
Blink planner.
- * Will be removed when [[Table]] is moved to "flink-table-api-java" module.
- *
- * @param tableEnv The [[TableEnvironment]] to which the table is bound.
- * @param relNode The Calcite RelNode representation
- */
-class Table(val tableEnv: TableEnvironment, relNode: RelNode) {
-
- /**
- * Returns the Calcite RelNode represent this Table.
- */
- def getRelNode: RelNode = relNode
-
-}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index d1c2028..4c2bdc4 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -181,13 +181,13 @@ abstract class TableEnvironment(val config: TableConfig) {
def registerTable(name: String, table: Table): Unit = {
// check that table belongs to this table environment
- if (table.tableEnv != this) {
+ if (table.asInstanceOf[TableImpl].tableEnv != this) {
throw new TableException(
"Only tables that belong to this TableEnvironment can be registered.")
}
checkValidTableName(name)
- val tableTable = new RelTable(table.getRelNode)
+ val tableTable = new RelTable(table.asInstanceOf[TableImpl].getRelNode)
registerTableInternal(name, tableTable)
}
@@ -247,7 +247,7 @@ abstract class TableEnvironment(val config: TableConfig) {
val table = schema.getTable(tableName)
if (table != null) {
val scan = relBuilder.scan(JArrays.asList(tablePath: _*)).build()
- return Some(new Table(this, scan))
+ return Some(new TableImpl(this, scan))
}
}
None
@@ -351,7 +351,7 @@ abstract class TableEnvironment(val config: TableConfig) {
val validated = planner.validate(parsed)
// transform to a relational tree
val relational = planner.rel(validated)
- new Table(this, relational.project())
+ new TableImpl(this, relational.project())
} else {
throw new TableException(
"Unsupported SQL query! sqlQuery() only accepts SQL queries of type " +
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
new file mode 100644
index 0000000..a025f45
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import org.apache.calcite.rel.RelNode
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.functions.TemporalTableFunction
+
+/**
+ * The implementation of the [[Table]].
+ *
+ * NOTE: Currently, [[TableImpl]] is just a wrapper for RelNode
+ * and all the methods in the class are not implemented. This is
+ * used to support end-to-end tests for Blink planner. It will be
+ * implemented when we support full stack Table API for Blink planner.
+ *
+ * @param tableEnv The [[TableEnvironment]] to which the table is bound.
+ * @param relNode The Calcite RelNode representation
+ */
+class TableImpl(val tableEnv: TableEnvironment, relNode: RelNode) extends
Table {
+
+ /**
+ * Returns the Calcite RelNode represent this Table.
+ */
+ def getRelNode: RelNode = relNode
+
+ override def getSchema: TableSchema = ???
+
+ override def printSchema(): Unit = ???
+
+ override def select(fields: String): Table = ???
+
+ override def select(fields: Expression*): Table = ???
+
+ override def createTemporalTableFunction(
+ timeAttribute: String,
+ primaryKey: String): TemporalTableFunction = ???
+
+ override def createTemporalTableFunction(
+ timeAttribute: Expression,
+ primaryKey: Expression): TemporalTableFunction = ???
+
+ override def as(fields: String): Table = ???
+
+ override def as(fields: Expression*): Table = ???
+
+ override def filter(predicate: String): Table = ???
+
+ override def filter(predicate: Expression): Table = ???
+
+ override def where(predicate: String): Table = ???
+
+ override def where(predicate: Expression): Table = ???
+
+ override def groupBy(fields: String): GroupedTable = ???
+
+ override def groupBy(fields: Expression*): GroupedTable = ???
+
+ override def distinct(): Table = ???
+
+ override def join(right: Table): Table = ???
+
+ override def join(
+ right: Table,
+ joinPredicate: String): Table = ???
+
+ override def join(
+ right: Table,
+ joinPredicate: Expression): Table = ???
+
+ override def leftOuterJoin(right: Table): Table = ???
+
+ override def leftOuterJoin(
+ right: Table,
+ joinPredicate: String): Table = ???
+
+ override def leftOuterJoin(
+ right: Table,
+ joinPredicate: Expression): Table = ???
+
+ override def rightOuterJoin(
+ right: Table,
+ joinPredicate: String): Table = ???
+
+ override def rightOuterJoin(
+ right: Table,
+ joinPredicate: Expression): Table = ???
+
+ override def fullOuterJoin(
+ right: Table,
+ joinPredicate: String): Table = ???
+
+ override def fullOuterJoin(
+ right: Table,
+ joinPredicate: Expression): Table = ???
+
+ override def joinLateral(tableFunctionCall: String): Table = ???
+
+ override def joinLateral(tableFunctionCall: Expression): Table = ???
+
+ override def joinLateral(
+ tableFunctionCall: String,
+ joinPredicate: String): Table = ???
+
+ override def joinLateral(
+ tableFunctionCall: Expression,
+ joinPredicate: Expression): Table = ???
+
+ override def leftOuterJoinLateral(tableFunctionCall: String): Table = ???
+
+ override def leftOuterJoinLateral(tableFunctionCall: Expression): Table = ???
+
+ override def leftOuterJoinLateral(
+ tableFunctionCall: String,
+ joinPredicate: String): Table = ???
+
+ override def leftOuterJoinLateral(
+ tableFunctionCall: Expression,
+ joinPredicate: Expression): Table = ???
+
+ override def minus(right: Table): Table = ???
+
+ override def minusAll(right: Table): Table = ???
+
+ override def union(right: Table): Table = ???
+
+ override def unionAll(right: Table): Table = ???
+
+ override def intersect(right: Table): Table = ???
+
+ override def intersectAll(right: Table): Table = ???
+
+ override def orderBy(fields: String): Table = ???
+
+ override def orderBy(fields: Expression*): Table = ???
+
+ override def offset(offset: Int): Table = ???
+
+ override def fetch(fetch: Int): Table = ???
+
+ override def insertInto(tableName: String): Unit = ???
+
+ override def insertInto(
+ tableName: String,
+ conf: QueryConfig): Unit = ???
+
+ override def window(groupWindow: GroupWindow): GroupWindowedTable = ???
+
+ override def window(overWindows: OverWindow*): OverWindowedTable = ???
+}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index e5981ef..7a5e747 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -51,7 +51,7 @@ class TableEnvironmentTest {
val table = env.fromElements[(Int, Long, String,
Boolean)]().toTable(tableEnv, 'a, 'b, 'c, 'd)
tableEnv.registerTable("MyTable", table)
val scanTable = tableEnv.scan("MyTable")
- val actual = RelOptUtil.toString(scanTable.getRelNode)
+ val actual =
RelOptUtil.toString(scanTable.asInstanceOf[TableImpl].getRelNode)
val expected = "LogicalTableScan(table=[[MyTable]])\n"
assertEquals(expected, actual)
@@ -66,7 +66,7 @@ class TableEnvironmentTest {
val table = env.fromElements[(Int, Long, String,
Boolean)]().toTable(tableEnv, 'a, 'b, 'c, 'd)
tableEnv.registerTable("MyTable", table)
val queryTable = tableEnv.sqlQuery("SELECT a, c, d FROM MyTable")
- val actual = RelOptUtil.toString(queryTable.getRelNode)
+ val actual =
RelOptUtil.toString(queryTable.asInstanceOf[TableImpl].getRelNode)
val expected = "LogicalProject(a=[$0], c=[$2], d=[$3])\n" +
" LogicalTableScan(table=[[MyTable]])\n"
assertEquals(expected, actual)
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/FlinkRelOptUtilTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/FlinkRelOptUtilTest.scala
index 73e283c..09d6c9a 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/FlinkRelOptUtilTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/FlinkRelOptUtilTest.scala
@@ -19,9 +19,8 @@ package org.apache.flink.table.plan.util
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, TableImpl}
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
-
import org.apache.calcite.sql.SqlExplainLevel
import org.junit.Assert.assertEquals
import org.junit.Test
@@ -44,7 +43,7 @@ class FlinkRelOptUtilTest {
|SELECT * FROM t1 JOIN t2 ON t1.a = t2.a
""".stripMargin
val result = tableEnv.sqlQuery(sqlQuery)
- val rel = result.getRelNode
+ val rel = result.asInstanceOf[TableImpl].getRelNode
val expected1 =
"""
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index 6b18035..fdb57ce 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -27,13 +27,12 @@ import org.apache.flink.streaming.api.{TimeCharacteristic,
environment}
import org.apache.flink.table.`type`.TypeConverters
import org.apache.flink.table.api.java.{BatchTableEnvironment =>
JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
import org.apache.flink.table.api.scala.{BatchTableEnvironment =>
ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv, _}
-import org.apache.flink.table.api.{Table, TableEnvironment, TableException,
TableSchema}
+import org.apache.flink.table.api.{BatchTableEnvironment => _,
StreamTableEnvironment => _, _}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction,
TableFunction}
-import org.apache.flink.table.plan.util._
+import org.apache.flink.table.plan.util.{FlinkRelOptUtil, _}
import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource}
import org.apache.flink.table.typeutils.BaseRowTypeInfo
-
import org.apache.calcite.rel.RelNode
import org.apache.calcite.sql.SqlExplainLevel
import org.apache.commons.lang3.SystemUtils
@@ -63,10 +62,12 @@ abstract class TableTestBase {
def batchTestUtil(): BatchTableTestUtil = BatchTableTestUtil(this)
def verifyTableEquals(expected: Table, actual: Table): Unit = {
+ val expectedString =
FlinkRelOptUtil.toString(expected.asInstanceOf[TableImpl].getRelNode)
+ val actualString =
FlinkRelOptUtil.toString(actual.asInstanceOf[TableImpl].getRelNode)
assertEquals(
"Logical plans do not match",
-
LogicalPlanFormatUtils.formatTempTableId(FlinkRelOptUtil.toString(expected.getRelNode)),
-
LogicalPlanFormatUtils.formatTempTableId(FlinkRelOptUtil.toString(actual.getRelNode)))
+ LogicalPlanFormatUtils.formatTempTableId(expectedString),
+ LogicalPlanFormatUtils.formatTempTableId(actualString))
}
}
@@ -213,7 +214,7 @@ abstract class TableTestUtil(test: TableTestBase) {
withRowType: Boolean,
printPlanBefore: Boolean): Unit = {
val table = getTableEnv.sqlQuery(sql)
- val relNode = table.getRelNode
+ val relNode = table.asInstanceOf[TableImpl].getRelNode
val optimizedPlan = getOptimizedPlan(relNode, explainLevel, withRowType =
withRowType)
assertEqualsOrExpand("sql", sql)
@@ -236,7 +237,7 @@ abstract class TableTestUtil(test: TableTestBase) {
explainLevel: SqlExplainLevel,
withRowType: Boolean,
printPlanBefore: Boolean): Unit = {
- val relNode = table.getRelNode
+ val relNode = table.asInstanceOf[TableImpl].getRelNode
val optimizedPlan = getOptimizedPlan(relNode, explainLevel, withRowType =
withRowType)
if (printPlanBefore) {