This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 147022cf7984b8e56d1e01e826d9640674837e68
Author: Jark Wu <[email protected]>
AuthorDate: Mon Jun 10 15:13:47 2019 +0800

    [FLINK-12708][table] Expose getTableStats() interface in TableSource
---
 .../apache/flink/table/sources/TableSource.java    | 10 +++
 .../apache/flink/table/api/TableEnvironment.scala  | 16 ++++-
 .../flink/table/api/batch/TableStatsTest.xml       | 69 +++++++++++++++++++
 .../flink/table/api/batch/TableStatsTest.scala     | 80 ++++++++++++++++++++++
 .../apache/flink/table/util/TableTestBase.scala    | 12 +++-
 .../flink/table/catalog/DatabaseCalciteSchema.java |  2 +-
 .../flink/table/plan/QueryOperationConverter.java  |  2 +-
 .../flink/table/plan/stats/FlinkStatistic.scala    |  2 +-
 8 files changed, 185 insertions(+), 8 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSource.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSource.java
index 386240b..42326ae 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSource.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSource.java
@@ -23,9 +23,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.plan.stats.TableStats;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.TableConnectorUtils;
 
+import java.util.Optional;
+
 import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
 /**
@@ -84,4 +87,11 @@ public interface TableSource<T> {
        default String explainSource() {
                return TableConnectorUtils.generateRuntimeName(getClass(), 
getTableSchema().getFieldNames());
        }
+
+       /**
+        * Returns the (optional) statistics for this {@link TableSource}.
+        */
+       default Optional<TableStats> getTableStats() {
+               return Optional.empty();
+       }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 3dbf25c..b603381 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -745,7 +745,13 @@ abstract class TableEnvironment(
     */
   def registerTableSource(name: String, tableSource: TableSource[_]): Unit = {
     checkValidTableName(name)
-    registerTableSourceInternal(name, tableSource, FlinkStatistic.UNKNOWN, 
replace = false)
+    registerTableSourceInternal(
+      name,
+      tableSource,
+      FlinkStatistic.builder()
+        .tableStats(tableSource.getTableStats.orElse(null))
+        .build(),
+      replace = false)
   }
 
   /**
@@ -758,7 +764,13 @@ abstract class TableEnvironment(
   def registerOrReplaceTableSource(name: String,
       tableSource: TableSource[_]): Unit = {
     checkValidTableName(name)
-    registerTableSourceInternal(name, tableSource, FlinkStatistic.UNKNOWN, 
replace = true)
+    registerTableSourceInternal(
+      name,
+      tableSource,
+      FlinkStatistic.builder()
+        .tableStats(tableSource.getTableStats.orElse(null))
+        .build(),
+      replace = true)
   }
 
   /**
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/batch/TableStatsTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/batch/TableStatsTest.xml
new file mode 100644
index 0000000..2026490
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/batch/TableStatsTest.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testTableStatsWithSmallRightTable">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM x, y WHERE a = d AND c LIKE 'He%'
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(=($0, $3), LIKE($2, _UTF-16LE'He%'))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], 
isBroadcast=[true], build=[right])
+:- Calc(select=[a, b, c], where=[LIKE(c, _UTF-16LE'He%')])
+:  +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], 
fields=[a, b, c])
++- Exchange(distribution=[broadcast])
+   +- TableSourceScan(table=[[y, source: [TestTableSource(d, e, f)]]], 
fields=[d, e, f])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTableStatsWithSmallLeftTable">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM x, y WHERE a = d AND c LIKE 'He%'
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(=($0, $3), LIKE($2, _UTF-16LE'He%'))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], 
isBroadcast=[true], build=[left])
+:- Exchange(distribution=[broadcast])
+:  +- Calc(select=[a, b, c], where=[LIKE(c, _UTF-16LE'He%')])
+:     +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], 
fields=[a, b, c])
++- TableSourceScan(table=[[y, source: [TestTableSource(d, e, f)]]], fields=[d, 
e, f])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/batch/TableStatsTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/batch/TableStatsTest.scala
new file mode 100644
index 0000000..2e3c5db
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/batch/TableStatsTest.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch
+
+import org.apache.flink.table.api.{DataTypes, PlannerConfigOptions, 
TableConfigOptions, TableSchema}
+import org.apache.flink.table.plan.stats.TableStats
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.util.{TableTestBase, TestTableSource}
+import org.junit.Test
+
+class TableStatsTest extends TableTestBase {
+
+  @Test
+  def testTableStatsWithSmallRightTable(): Unit = {
+    val util = batchTestUtil()
+    util.tableEnv.registerTableSource("x", createTableX(new 
TableStats(10000000L)))
+    util.tableEnv.registerTableSource("y", createTableY(new TableStats(100L)))
+    util.tableEnv.getConfig.getConf.setString(
+      TableConfigOptions.SQL_EXEC_DISABLED_OPERATORS, 
"NestedLoopJoin,SortMergeJoin")
+    util.tableEnv.getConfig.getConf.setLong(
+      PlannerConfigOptions.SQL_OPTIMIZER_HASH_JOIN_BROADCAST_THRESHOLD, 5000)
+    val sqlQuery =
+      """
+        |SELECT * FROM x, y WHERE a = d AND c LIKE 'He%'
+      """.stripMargin
+    // right table should be broadcast
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testTableStatsWithSmallLeftTable(): Unit = {
+    val util = batchTestUtil()
+    util.tableEnv.registerTableSource("x", createTableX(new TableStats(100L)))
+    util.tableEnv.registerTableSource("y", createTableY(new 
TableStats(10000000L)))
+    util.tableEnv.getConfig.getConf.setString(
+      TableConfigOptions.SQL_EXEC_DISABLED_OPERATORS, 
"NestedLoopJoin,SortMergeJoin")
+    util.tableEnv.getConfig.getConf.setLong(
+      PlannerConfigOptions.SQL_OPTIMIZER_HASH_JOIN_BROADCAST_THRESHOLD, 5000)
+    val sqlQuery =
+      """
+        |SELECT * FROM x, y WHERE a = d AND c LIKE 'He%'
+      """.stripMargin
+    // left table should be broadcast
+    util.verifyPlan(sqlQuery)
+  }
+
+  private def createTableX(tableStats: TableStats): TableSource[_] = {
+    val schema = TableSchema.builder()
+      .field("a", DataTypes.INT())    // 4
+      .field("b", DataTypes.BIGINT()) // 8
+      .field("c", DataTypes.STRING()) // 12
+      .build()
+    new TestTableSource(true, schema, Option(tableStats))
+  }
+
+  private def createTableY(tableStats: TableStats): TableSource[_] = {
+    val schema = TableSchema.builder()
+      .field("d", DataTypes.INT())
+      .field("e", DataTypes.BIGINT())
+      .field("f", DataTypes.STRING())
+      .build()
+    new TestTableSource(true, schema, Option(tableStats))
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index 0d92a4d..6342018 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -33,7 +33,7 @@ import org.apache.flink.table.functions.{AggregateFunction, 
ScalarFunction, Tabl
 import org.apache.flink.table.plan.nodes.exec.ExecNode
 import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram, 
FlinkStreamProgram}
 import org.apache.flink.table.plan.schema.TableSourceTable
-import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.plan.stats.{FlinkStatistic, TableStats}
 import org.apache.flink.table.plan.util.{ExecNodePlanDumper, FlinkRelOptUtil}
 import org.apache.flink.table.runtime.utils.{BatchTableEnvUtil, 
TestingAppendTableSink, TestingRetractTableSink, TestingUpsertTableSink}
 import org.apache.flink.table.sinks._
@@ -47,13 +47,14 @@ import org.apache.flink.types.Row
 
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql.SqlExplainLevel
-
 import org.apache.commons.lang3.SystemUtils
 
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.Rule
 import org.junit.rules.{ExpectedException, TestName}
 
+import _root_.java.util.Optional
+
 import _root_.scala.collection.JavaConversions._
 
 /**
@@ -644,7 +645,7 @@ case class BatchTableTestUtil(test: TableTestBase) extends 
TableTestUtil(test) {
 /**
   * Batch/Stream [[org.apache.flink.table.sources.TableSource]] for testing.
   */
-class TestTableSource(isBatch: Boolean, schema: TableSchema)
+class TestTableSource(isBatch: Boolean, schema: TableSchema, tableStats: 
Option[TableStats] = None)
   extends StreamTableSource[BaseRow] {
 
   override def isBounded: Boolean = isBatch
@@ -661,4 +662,9 @@ class TestTableSource(isBatch: Boolean, schema: TableSchema)
   }
 
   override def getTableSchema: TableSchema = schema
+
+  /** Returns the statistics of the table, returns null if don't know the 
statistics. */
+  override def getTableStats: Optional[TableStats] = {
+    Optional.ofNullable(tableStats.orNull)
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index 14ef5aa..356852f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -75,7 +75,7 @@ class DatabaseCalciteSchema implements Schema {
                                        .map(tableSource -> new 
TableSourceTable<>(
                                                tableSource,
                                                !connectorTable.isBatch(),
-                                               FlinkStatistic.UNKNOWN()))
+                                               
FlinkStatistic.of(tableSource.getTableStats().orElse(null))))
                                        .orElseThrow(() -> new 
TableException("Cannot query a sink only table."));
                        } else {
                                throw new TableException("Unsupported table 
type: " + table);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
index 3c3eff2..5bc8206 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
@@ -278,7 +278,7 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
                        final Table relTable = new TableSourceTable<>(
                                tableSourceTable.getTableSource(),
                                !tableSourceTable.isBatch(),
-                               FlinkStatistic.UNKNOWN());
+                               
FlinkStatistic.of(tableSourceTable.getTableSource().getTableStats().orElse(null)));
 
                        CatalogReader catalogReader = (CatalogReader) 
relBuilder.getRelOptSchema();
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
index 1acc91f..754509e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
@@ -86,6 +86,6 @@ object FlinkStatistic {
     * @param tableStats The table statistics.
     * @return The generated FlinkStatistic
     */
-  def of(tableStats: TableStats): FlinkStatistic = new 
FlinkStatistic(Some(tableStats))
+  def of(tableStats: TableStats): FlinkStatistic = new 
FlinkStatistic(Option(tableStats))
 
 }

Reply via email to