This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fa31a14  [FLINK-11994] [table-planner-blink] Introduce TableImpl and 
remove Table in flink-table-planner-blink
fa31a14 is described below

commit fa31a145b9167d1f237bd495e959fa54ea595ca1
Author: Jark Wu <[email protected]>
AuthorDate: Fri Mar 22 12:15:37 2019 +0800

    [FLINK-11994] [table-planner-blink] Introduce TableImpl and remove Table in 
flink-table-planner-blink
    
    This closes #8032
---
 .../flink/table/api/BatchTableEnvironment.scala    |   2 +-
 .../flink/table/api/StreamTableEnvironment.scala   |   2 +-
 .../scala/org/apache/flink/table/api/Table.scala   |  41 -----
 .../apache/flink/table/api/TableEnvironment.scala  |   8 +-
 .../org/apache/flink/table/api/TableImpl.scala     | 166 +++++++++++++++++++++
 .../flink/table/api/TableEnvironmentTest.scala     |   4 +-
 .../table/plan/util/FlinkRelOptUtilTest.scala      |   5 +-
 .../apache/flink/table/util/TableTestBase.scala    |  15 +-
 8 files changed, 184 insertions(+), 59 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index db35650..95838cf 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -95,7 +95,7 @@ class BatchTableEnvironment(
     * @param extended Flag to include detailed optimizer estimates.
     */
   def explain(table: Table, extended: Boolean): String = {
-    val ast = table.getRelNode
+    val ast = table.asInstanceOf[TableImpl].getRelNode
     val optimizedNode = optimize(ast)
 
     val explainLevel = if (extended) {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 36b9da6..f285214 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -107,7 +107,7 @@ abstract class StreamTableEnvironment(
     * @param extended Flag to include detailed optimizer estimates.
     */
   def explain(table: Table, extended: Boolean): String = {
-    val ast = table.getRelNode
+    val ast = table.asInstanceOf[TableImpl].getRelNode
     val optimizedNode = optimize(ast)
 
     val explainLevel = if (extended) {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/Table.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/Table.scala
deleted file mode 100644
index 8be880f..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/Table.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api
-
-import org.apache.calcite.rel.RelNode
-
-/**
-  * A Table is the core component of the Table API.
-  * Similar to how the batch and streaming APIs have DataSet and DataStream,
-  * the Table API is built around [[Table]].
-  *
-  * NOTE: This class is only a placeholder to support end-to-end tests for 
Blink planner.
-  * Will be removed when [[Table]] is moved to "flink-table-api-java" module.
-  *
-  * @param tableEnv The [[TableEnvironment]] to which the table is bound.
-  * @param relNode  The Calcite RelNode representation
-  */
-class Table(val tableEnv: TableEnvironment, relNode: RelNode) {
-
-  /**
-    * Returns the Calcite RelNode represent this Table.
-    */
-  def getRelNode: RelNode = relNode
-
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index d1c2028..4c2bdc4 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -181,13 +181,13 @@ abstract class TableEnvironment(val config: TableConfig) {
   def registerTable(name: String, table: Table): Unit = {
 
     // check that table belongs to this table environment
-    if (table.tableEnv != this) {
+    if (table.asInstanceOf[TableImpl].tableEnv != this) {
       throw new TableException(
         "Only tables that belong to this TableEnvironment can be registered.")
     }
 
     checkValidTableName(name)
-    val tableTable = new RelTable(table.getRelNode)
+    val tableTable = new RelTable(table.asInstanceOf[TableImpl].getRelNode)
     registerTableInternal(name, tableTable)
   }
 
@@ -247,7 +247,7 @@ abstract class TableEnvironment(val config: TableConfig) {
       val table = schema.getTable(tableName)
       if (table != null) {
         val scan = relBuilder.scan(JArrays.asList(tablePath: _*)).build()
-        return Some(new Table(this, scan))
+        return Some(new TableImpl(this, scan))
       }
     }
     None
@@ -351,7 +351,7 @@ abstract class TableEnvironment(val config: TableConfig) {
       val validated = planner.validate(parsed)
       // transform to a relational tree
       val relational = planner.rel(validated)
-      new Table(this, relational.project())
+      new TableImpl(this, relational.project())
     } else {
       throw new TableException(
         "Unsupported SQL query! sqlQuery() only accepts SQL queries of type " +
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
new file mode 100644
index 0000000..a025f45
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import org.apache.calcite.rel.RelNode
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.functions.TemporalTableFunction
+
+/**
+  * The implementation of the [[Table]].
+  *
+  * NOTE: Currently, [[TableImpl]] is just a wrapper for RelNode
+  * and all the methods in the class are not implemented. This is
+  * used to support end-to-end tests for Blink planner. It will be
+  * implemented when we support full stack Table API for Blink planner.
+  *
+  * @param tableEnv The [[TableEnvironment]] to which the table is bound.
+  * @param relNode  The Calcite RelNode representation
+  */
+class TableImpl(val tableEnv: TableEnvironment, relNode: RelNode) extends 
Table {
+
+  /**
+    * Returns the Calcite RelNode represent this Table.
+    */
+  def getRelNode: RelNode = relNode
+
+  override def getSchema: TableSchema = ???
+
+  override def printSchema(): Unit = ???
+
+  override def select(fields: String): Table = ???
+
+  override def select(fields: Expression*): Table = ???
+
+  override def createTemporalTableFunction(
+    timeAttribute: String,
+    primaryKey: String): TemporalTableFunction = ???
+
+  override def createTemporalTableFunction(
+    timeAttribute: Expression,
+    primaryKey: Expression): TemporalTableFunction = ???
+
+  override def as(fields: String): Table = ???
+
+  override def as(fields: Expression*): Table = ???
+
+  override def filter(predicate: String): Table = ???
+
+  override def filter(predicate: Expression): Table = ???
+
+  override def where(predicate: String): Table = ???
+
+  override def where(predicate: Expression): Table = ???
+
+  override def groupBy(fields: String): GroupedTable = ???
+
+  override def groupBy(fields: Expression*): GroupedTable = ???
+
+  override def distinct(): Table = ???
+
+  override def join(right: Table): Table = ???
+
+  override def join(
+    right: Table,
+    joinPredicate: String): Table = ???
+
+  override def join(
+    right: Table,
+    joinPredicate: Expression): Table = ???
+
+  override def leftOuterJoin(right: Table): Table = ???
+
+  override def leftOuterJoin(
+    right: Table,
+    joinPredicate: String): Table = ???
+
+  override def leftOuterJoin(
+    right: Table,
+    joinPredicate: Expression): Table = ???
+
+  override def rightOuterJoin(
+    right: Table,
+    joinPredicate: String): Table = ???
+
+  override def rightOuterJoin(
+    right: Table,
+    joinPredicate: Expression): Table = ???
+
+  override def fullOuterJoin(
+    right: Table,
+    joinPredicate: String): Table = ???
+
+  override def fullOuterJoin(
+    right: Table,
+    joinPredicate: Expression): Table = ???
+
+  override def joinLateral(tableFunctionCall: String): Table = ???
+
+  override def joinLateral(tableFunctionCall: Expression): Table = ???
+
+  override def joinLateral(
+    tableFunctionCall: String,
+    joinPredicate: String): Table = ???
+
+  override def joinLateral(
+    tableFunctionCall: Expression,
+    joinPredicate: Expression): Table = ???
+
+  override def leftOuterJoinLateral(tableFunctionCall: String): Table = ???
+
+  override def leftOuterJoinLateral(tableFunctionCall: Expression): Table = ???
+
+  override def leftOuterJoinLateral(
+    tableFunctionCall: String,
+    joinPredicate: String): Table = ???
+
+  override def leftOuterJoinLateral(
+    tableFunctionCall: Expression,
+    joinPredicate: Expression): Table = ???
+
+  override def minus(right: Table): Table = ???
+
+  override def minusAll(right: Table): Table = ???
+
+  override def union(right: Table): Table = ???
+
+  override def unionAll(right: Table): Table = ???
+
+  override def intersect(right: Table): Table = ???
+
+  override def intersectAll(right: Table): Table = ???
+
+  override def orderBy(fields: String): Table = ???
+
+  override def orderBy(fields: Expression*): Table = ???
+
+  override def offset(offset: Int): Table = ???
+
+  override def fetch(fetch: Int): Table = ???
+
+  override def insertInto(tableName: String): Unit = ???
+
+  override def insertInto(
+    tableName: String,
+    conf: QueryConfig): Unit = ???
+
+  override def window(groupWindow: GroupWindow): GroupWindowedTable = ???
+
+  override def window(overWindows: OverWindow*): OverWindowedTable = ???
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index e5981ef..7a5e747 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -51,7 +51,7 @@ class TableEnvironmentTest {
     val table = env.fromElements[(Int, Long, String, 
Boolean)]().toTable(tableEnv, 'a, 'b, 'c, 'd)
     tableEnv.registerTable("MyTable", table)
     val scanTable = tableEnv.scan("MyTable")
-    val actual = RelOptUtil.toString(scanTable.getRelNode)
+    val actual = 
RelOptUtil.toString(scanTable.asInstanceOf[TableImpl].getRelNode)
     val expected = "LogicalTableScan(table=[[MyTable]])\n"
     assertEquals(expected, actual)
 
@@ -66,7 +66,7 @@ class TableEnvironmentTest {
     val table = env.fromElements[(Int, Long, String, 
Boolean)]().toTable(tableEnv, 'a, 'b, 'c, 'd)
     tableEnv.registerTable("MyTable", table)
     val queryTable = tableEnv.sqlQuery("SELECT a, c, d FROM MyTable")
-    val actual = RelOptUtil.toString(queryTable.getRelNode)
+    val actual = 
RelOptUtil.toString(queryTable.asInstanceOf[TableImpl].getRelNode)
     val expected = "LogicalProject(a=[$0], c=[$2], d=[$3])\n" +
       "  LogicalTableScan(table=[[MyTable]])\n"
     assertEquals(expected, actual)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/FlinkRelOptUtilTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/FlinkRelOptUtilTest.scala
index 73e283c..09d6c9a 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/FlinkRelOptUtilTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/FlinkRelOptUtilTest.scala
@@ -19,9 +19,8 @@ package org.apache.flink.table.plan.util
 
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, TableImpl}
 import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
-
 import org.apache.calcite.sql.SqlExplainLevel
 import org.junit.Assert.assertEquals
 import org.junit.Test
@@ -44,7 +43,7 @@ class FlinkRelOptUtilTest {
         |SELECT * FROM t1 JOIN t2 ON t1.a = t2.a
       """.stripMargin
     val result = tableEnv.sqlQuery(sqlQuery)
-    val rel = result.getRelNode
+    val rel = result.asInstanceOf[TableImpl].getRelNode
 
     val expected1 =
       """
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index 6b18035..fdb57ce 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -27,13 +27,12 @@ import org.apache.flink.streaming.api.{TimeCharacteristic, 
environment}
 import org.apache.flink.table.`type`.TypeConverters
 import org.apache.flink.table.api.java.{BatchTableEnvironment => 
JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => 
ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv, _}
-import org.apache.flink.table.api.{Table, TableEnvironment, TableException, 
TableSchema}
+import org.apache.flink.table.api.{BatchTableEnvironment => _, 
StreamTableEnvironment => _, _}
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, 
TableFunction}
-import org.apache.flink.table.plan.util._
+import org.apache.flink.table.plan.util.{FlinkRelOptUtil, _}
 import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource}
 import org.apache.flink.table.typeutils.BaseRowTypeInfo
-
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql.SqlExplainLevel
 import org.apache.commons.lang3.SystemUtils
@@ -63,10 +62,12 @@ abstract class TableTestBase {
   def batchTestUtil(): BatchTableTestUtil = BatchTableTestUtil(this)
 
   def verifyTableEquals(expected: Table, actual: Table): Unit = {
+    val expectedString = 
FlinkRelOptUtil.toString(expected.asInstanceOf[TableImpl].getRelNode)
+    val actualString = 
FlinkRelOptUtil.toString(actual.asInstanceOf[TableImpl].getRelNode)
     assertEquals(
       "Logical plans do not match",
-      
LogicalPlanFormatUtils.formatTempTableId(FlinkRelOptUtil.toString(expected.getRelNode)),
-      
LogicalPlanFormatUtils.formatTempTableId(FlinkRelOptUtil.toString(actual.getRelNode)))
+      LogicalPlanFormatUtils.formatTempTableId(expectedString),
+      LogicalPlanFormatUtils.formatTempTableId(actualString))
   }
 }
 
@@ -213,7 +214,7 @@ abstract class TableTestUtil(test: TableTestBase) {
       withRowType: Boolean,
       printPlanBefore: Boolean): Unit = {
     val table = getTableEnv.sqlQuery(sql)
-    val relNode = table.getRelNode
+    val relNode = table.asInstanceOf[TableImpl].getRelNode
     val optimizedPlan = getOptimizedPlan(relNode, explainLevel, withRowType = 
withRowType)
 
     assertEqualsOrExpand("sql", sql)
@@ -236,7 +237,7 @@ abstract class TableTestUtil(test: TableTestBase) {
       explainLevel: SqlExplainLevel,
       withRowType: Boolean,
       printPlanBefore: Boolean): Unit = {
-    val relNode = table.getRelNode
+    val relNode = table.asInstanceOf[TableImpl].getRelNode
     val optimizedPlan = getOptimizedPlan(relNode, explainLevel, withRowType = 
withRowType)
 
     if (printPlanBefore) {

Reply via email to