This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 3598a16  [FLINK-21345][table-planner] Fix bug of union all join 
temporal table
3598a16 is described below

commit 3598a16f4d2d46b75f15a4eb01610ecfe2640f1e
Author: zhangjun02 <[email protected]>
AuthorDate: Mon Jul 12 16:38:01 2021 +0800

    [FLINK-21345][table-planner] Fix bug of union all join temporal table
    
    (cherry picked from commit 24e6121d5f882e55dfc0616b1da81dc0b46f2d34)
---
 .../BatchCommonSubGraphBasedOptimizer.scala        |  8 +--
 .../StreamCommonSubGraphBasedOptimizer.scala       | 14 ++---
 .../optimize/program/FlinkOptimizeContext.scala    |  9 +--
 .../program/FlinkRelTimeIndicatorProgram.scala     |  2 +-
 ...relateToJoinFromTemporalTableFunctionRule.scala | 26 ++++-----
 .../sql/TemporalTableFunctionJoinITCase.scala      | 68 ++++++++++++++++++++++
 6 files changed, 93 insertions(+), 34 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala
index b838fa6..d423dbc 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala
@@ -18,9 +18,10 @@
 
 package org.apache.flink.table.planner.plan.optimize
 
+import org.apache.calcite.rel.RelNode
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
-import org.apache.flink.table.planner.calcite.{FlinkContext, 
SqlExprToRexConverterFactory}
+import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder, 
SqlExprToRexConverterFactory}
 import org.apache.flink.table.planner.delegation.BatchPlanner
 import org.apache.flink.table.planner.plan.nodes.calcite.{LegacySink, Sink}
 import 
org.apache.flink.table.planner.plan.optimize.program.{BatchOptimizeContext, 
FlinkBatchProgram}
@@ -28,9 +29,6 @@ import 
org.apache.flink.table.planner.plan.schema.IntermediateRelTable
 import org.apache.flink.table.planner.utils.TableConfigUtils
 import org.apache.flink.util.Preconditions
 
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rex.RexBuilder
-
 import java.util.Collections
 
 /**
@@ -97,7 +95,7 @@ class BatchCommonSubGraphBasedOptimizer(planner: BatchPlanner)
       override def getSqlExprToRexConverterFactory: 
SqlExprToRexConverterFactory =
         context.getSqlExprToRexConverterFactory
 
-      override def getRexBuilder: RexBuilder = 
planner.getRelBuilder.getRexBuilder
+      override def getFlinkRelBuilder: FlinkRelBuilder = planner.getRelBuilder
 
       override def needFinalTimeIndicatorConversion: Boolean = true
     })
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
index fb9e10f..3960aac 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
@@ -18,25 +18,23 @@
 
 package org.apache.flink.table.planner.plan.optimize
 
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
-import org.apache.flink.table.planner.calcite.{FlinkContext, 
SqlExprToRexConverterFactory}
+import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder, 
SqlExprToRexConverterFactory}
 import org.apache.flink.table.planner.delegation.StreamPlanner
-import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval, 
MiniBatchIntervalTrait, MiniBatchIntervalTraitDef, MiniBatchMode, 
ModifyKindSet, ModifyKindSetTraitDef, UpdateKind, UpdateKindTraitDef}
+import org.apache.flink.table.planner.plan.`trait`._
 import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.planner.plan.nodes.calcite.{LegacySink, Sink}
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDataStreamScan,
 StreamPhysicalIntermediateTableScan, StreamPhysicalLegacyTableSourceScan, 
StreamPhysicalRel, StreamPhysicalTableSourceScan}
+import org.apache.flink.table.planner.plan.nodes.physical.stream._
 import 
org.apache.flink.table.planner.plan.optimize.program.{FlinkStreamProgram, 
StreamOptimizeContext}
 import org.apache.flink.table.planner.plan.schema.IntermediateRelTable
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
 import org.apache.flink.table.planner.utils.TableConfigUtils
 import org.apache.flink.util.Preconditions
 
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rex.RexBuilder
-
 import java.util
 import java.util.Collections
 import scala.collection.JavaConversions._
@@ -173,7 +171,7 @@ class StreamCommonSubGraphBasedOptimizer(planner: 
StreamPlanner)
       override def getSqlExprToRexConverterFactory: 
SqlExprToRexConverterFactory =
         context.getSqlExprToRexConverterFactory
 
-      override def getRexBuilder: RexBuilder = 
planner.getRelBuilder.getRexBuilder
+      override def getFlinkRelBuilder: FlinkRelBuilder = planner.getRelBuilder
 
       override def isUpdateBeforeRequired: Boolean = updateBeforeRequired
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkOptimizeContext.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkOptimizeContext.scala
index 4fc0b034..7fadb0c 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkOptimizeContext.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkOptimizeContext.scala
@@ -18,17 +18,18 @@
 
 package org.apache.flink.table.planner.plan.optimize.program
 
-import org.apache.calcite.rex.RexBuilder
 import org.apache.flink.table.planner.calcite.FlinkContext
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder
 
 /**
   * A FlinkOptimizeContext allows to obtain table environment information when 
optimizing.
   */
 trait FlinkOptimizeContext extends FlinkContext {
+
   /**
-    * Gets the Calcite [[RexBuilder]] defined in 
[[org.apache.flink.table.api.TableEnvironment]].
-    */
-  def getRexBuilder: RexBuilder
+   * Gets the [[FlinkRelBuilder]] defined in 
[[org.apache.flink.table.api.TableEnvironment]].
+   */
+  def getFlinkRelBuilder: FlinkRelBuilder
 
   /**
     * Returns true if the output node needs final TimeIndicator conversion
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRelTimeIndicatorProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRelTimeIndicatorProgram.scala
index f638b6c..2c46621 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRelTimeIndicatorProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRelTimeIndicatorProgram.scala
@@ -31,7 +31,7 @@ import org.apache.calcite.rel.RelNode
 class FlinkRelTimeIndicatorProgram[OC <: FlinkOptimizeContext] extends 
FlinkOptimizeProgram[OC] {
 
   override def optimize(input: RelNode, context: OC): RelNode = {
-    val rexBuilder = Preconditions.checkNotNull(context.getRexBuilder)
+    val rexBuilder = 
Preconditions.checkNotNull(context.getFlinkRelBuilder.getRexBuilder)
     RelTimeIndicatorConverter.convert(input, rexBuilder, 
context.needFinalTimeIndicatorConversion)
   }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
index 5ba2238..27889f1 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
@@ -19,24 +19,24 @@
 package org.apache.flink.table.planner.plan.rules.logical
 
 import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.expressions.{FieldReferenceExpression, _}
+import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.{TemporalTableFunction, 
TemporalTableFunctionImpl}
 import org.apache.flink.table.operations.QueryOperation
-import org.apache.flink.table.planner.calcite.FlinkRelBuilder
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
 import org.apache.flink.table.planner.functions.utils.TableSqlFunction
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkOptimizeContext
 import 
org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.{makeProcTimeTemporalFunctionJoinConCall,
 makeRowTimeTemporalFunctionJoinConCall}
 import org.apache.flink.table.planner.plan.utils.{ExpandTableScanShuttle, 
RexDefaultVisitor}
+import org.apache.flink.table.planner.utils.ShortcutUtils
 import 
org.apache.flink.table.types.logical.LogicalTypeRoot.{TIMESTAMP_WITHOUT_TIME_ZONE,
 TIMESTAMP_WITH_LOCAL_TIME_ZONE}
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{hasRoot, 
isProctimeAttribute}
 import org.apache.flink.util.Preconditions.checkState
 
 import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
-import org.apache.calcite.plan.hep.HepRelVertex
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptSchema}
-import org.apache.calcite.rel.{BiRel, RelNode, SingleRel}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.{JoinRelType, TableFunctionScan}
-import org.apache.calcite.rel.logical.{LogicalCorrelate}
+import org.apache.calcite.rel.logical.LogicalCorrelate
 import org.apache.calcite.rex._
 
 /**
@@ -97,7 +97,10 @@ class LogicalCorrelateToJoinFromTemporalTableFunctionRule
           .getUnderlyingHistoryTable
         val rexBuilder = cluster.getRexBuilder
 
-        val relBuilder = FlinkRelBuilder.of(cluster, getRelOptSchema(leftNode))
+        val flinkContext = ShortcutUtils.unwrapContext(call.getPlanner)
+          .asInstanceOf[FlinkOptimizeContext]
+        val relBuilder = flinkContext.getFlinkRelBuilder
+
         val temporalTable: RelNode = 
relBuilder.queryOperation(underlyingHistoryTable).build()
         // expand QueryOperationCatalogViewTable in Table Scan
         val shuttle = new ExpandTableScanShuttle
@@ -148,15 +151,6 @@ class LogicalCorrelateToJoinFromTemporalTableFunctionRule
       rightDataTypeField.getType, rightReferencesOffset + 
rightDataTypeField.getIndex)
   }
 
-  /**
-    * Gets [[RelOptSchema]] from the leaf [[RelNode]] which holds a non-null 
[[RelOptSchema]].
-    */
-  private def getRelOptSchema(relNode: RelNode): RelOptSchema = relNode match {
-    case hep: HepRelVertex => getRelOptSchema(hep.getCurrentRel)
-    case single: SingleRel => getRelOptSchema(single.getInput)
-    case bi: BiRel => getRelOptSchema(bi.getLeft)
-    case _ => relNode.getTable.getRelOptSchema
-  }
 }
 
 object LogicalCorrelateToJoinFromTemporalTableFunctionRule {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala
index afd4cec..7cc83ee 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala
@@ -98,6 +98,74 @@ class TemporalTableFunctionJoinITCase(state: 
StateBackendMode)
   }
 
   @Test
+  def testProcessTimeInnerJoinWithConstantTable(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
+    env.setParallelism(1)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+    val result = tEnv.sqlQuery("SELECT amount, currency, proctime() as 
proctime " +
+      "FROM (VALUES (1, 2.0)) AS T(amount, currency)").toAppendStream[Row]
+    result.addSink(new TestingAppendSink)
+    env.execute()
+  }
+
+  @Test
+  def testProcessTimeInnerJoinUnionAll(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
+    env.setParallelism(1)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val sqlQuery =
+      """
+        |SELECT
+        |  o.amount * r.rate AS amount
+        |FROM
+        |  Orders AS o,
+        |  LATERAL TABLE (Rates(o.proctime)) AS r
+        |WHERE r.currency = o.currency
+        |""".stripMargin
+
+    val ordersData = new mutable.MutableList[(Long, String)]
+    ordersData.+=((2L, "Euro"))
+    ordersData.+=((1L, "US Dollar"))
+    ordersData.+=((50L, "Yen"))
+    ordersData.+=((3L, "Euro"))
+    ordersData.+=((5L, "US Dollar"))
+
+    val ratesHistoryData = new mutable.MutableList[(String, Long)]
+    ratesHistoryData.+=(("US Dollar", 102L))
+    ratesHistoryData.+=(("Euro", 114L))
+    ratesHistoryData.+=(("Yen", 1L))
+    ratesHistoryData.+=(("Euro", 116L))
+    ratesHistoryData.+=(("Euro", 119L))
+
+    val orders1 = env
+      .fromCollection(ordersData)
+      .toTable(tEnv, 'amount, 'currency, 'proctime.proctime)
+    val orders2 = env
+      .fromCollection(ordersData)
+      .toTable(tEnv, 'amount, 'currency, 'proctime.proctime)
+    val ratesHistory = env
+      .fromCollection(ratesHistoryData)
+      .toTable(tEnv, 'currency, 'rate, 'proctime.proctime)
+
+    tEnv.registerTable("Orders1", orders1)
+    tEnv.registerTable("Orders2", orders2)
+    tEnv.registerTable("RatesHistory", ratesHistory)
+
+    tEnv.registerFunction(
+      "Rates",
+      ratesHistory.createTemporalTableFunction($"proctime", $"currency"))
+    tEnv.registerTable(
+      "Orders",
+      tEnv.sqlQuery("SELECT * FROM Orders1 UNION ALL SELECT * FROM Orders2"))
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new TestingAppendSink)
+    env.execute()
+  }
+
+  @Test
   def testEventTimeInnerJoin(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)

Reply via email to