This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 3598a16 [FLINK-21345][table-planner] Fix bug of union all join
temporal table
3598a16 is described below
commit 3598a16f4d2d46b75f15a4eb01610ecfe2640f1e
Author: zhangjun02 <[email protected]>
AuthorDate: Mon Jul 12 16:38:01 2021 +0800
[FLINK-21345][table-planner] Fix bug of union all join temporal table
(cherry picked from commit 24e6121d5f882e55dfc0616b1da81dc0b46f2d34)
---
.../BatchCommonSubGraphBasedOptimizer.scala | 8 +--
.../StreamCommonSubGraphBasedOptimizer.scala | 14 ++---
.../optimize/program/FlinkOptimizeContext.scala | 9 +--
.../program/FlinkRelTimeIndicatorProgram.scala | 2 +-
...relateToJoinFromTemporalTableFunctionRule.scala | 26 ++++-----
.../sql/TemporalTableFunctionJoinITCase.scala | 68 ++++++++++++++++++++++
6 files changed, 93 insertions(+), 34 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala
index b838fa6..d423dbc 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala
@@ -18,9 +18,10 @@
package org.apache.flink.table.planner.plan.optimize
+import org.apache.calcite.rel.RelNode
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
-import org.apache.flink.table.planner.calcite.{FlinkContext,
SqlExprToRexConverterFactory}
+import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder,
SqlExprToRexConverterFactory}
import org.apache.flink.table.planner.delegation.BatchPlanner
import org.apache.flink.table.planner.plan.nodes.calcite.{LegacySink, Sink}
import
org.apache.flink.table.planner.plan.optimize.program.{BatchOptimizeContext,
FlinkBatchProgram}
@@ -28,9 +29,6 @@ import
org.apache.flink.table.planner.plan.schema.IntermediateRelTable
import org.apache.flink.table.planner.utils.TableConfigUtils
import org.apache.flink.util.Preconditions
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rex.RexBuilder
-
import java.util.Collections
/**
@@ -97,7 +95,7 @@ class BatchCommonSubGraphBasedOptimizer(planner: BatchPlanner)
override def getSqlExprToRexConverterFactory:
SqlExprToRexConverterFactory =
context.getSqlExprToRexConverterFactory
- override def getRexBuilder: RexBuilder =
planner.getRelBuilder.getRexBuilder
+ override def getFlinkRelBuilder: FlinkRelBuilder = planner.getRelBuilder
override def needFinalTimeIndicatorConversion: Boolean = true
})
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
index fb9e10f..3960aac 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
@@ -18,25 +18,23 @@
package org.apache.flink.table.planner.plan.optimize
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableScan
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
-import org.apache.flink.table.planner.calcite.{FlinkContext,
SqlExprToRexConverterFactory}
+import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder,
SqlExprToRexConverterFactory}
import org.apache.flink.table.planner.delegation.StreamPlanner
-import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval,
MiniBatchIntervalTrait, MiniBatchIntervalTraitDef, MiniBatchMode,
ModifyKindSet, ModifyKindSetTraitDef, UpdateKind, UpdateKindTraitDef}
+import org.apache.flink.table.planner.plan.`trait`._
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.calcite.{LegacySink, Sink}
-import
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDataStreamScan,
StreamPhysicalIntermediateTableScan, StreamPhysicalLegacyTableSourceScan,
StreamPhysicalRel, StreamPhysicalTableSourceScan}
+import org.apache.flink.table.planner.plan.nodes.physical.stream._
import
org.apache.flink.table.planner.plan.optimize.program.{FlinkStreamProgram,
StreamOptimizeContext}
import org.apache.flink.table.planner.plan.schema.IntermediateRelTable
import org.apache.flink.table.planner.plan.stats.FlinkStatistic
import org.apache.flink.table.planner.utils.TableConfigUtils
import org.apache.flink.util.Preconditions
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rex.RexBuilder
-
import java.util
import java.util.Collections
import scala.collection.JavaConversions._
@@ -173,7 +171,7 @@ class StreamCommonSubGraphBasedOptimizer(planner:
StreamPlanner)
override def getSqlExprToRexConverterFactory:
SqlExprToRexConverterFactory =
context.getSqlExprToRexConverterFactory
- override def getRexBuilder: RexBuilder =
planner.getRelBuilder.getRexBuilder
+ override def getFlinkRelBuilder: FlinkRelBuilder = planner.getRelBuilder
override def isUpdateBeforeRequired: Boolean = updateBeforeRequired
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkOptimizeContext.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkOptimizeContext.scala
index 4fc0b034..7fadb0c 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkOptimizeContext.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkOptimizeContext.scala
@@ -18,17 +18,18 @@
package org.apache.flink.table.planner.plan.optimize.program
-import org.apache.calcite.rex.RexBuilder
import org.apache.flink.table.planner.calcite.FlinkContext
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder
/**
* A FlinkOptimizeContext allows to obtain table environment information when
optimizing.
*/
trait FlinkOptimizeContext extends FlinkContext {
+
/**
- * Gets the Calcite [[RexBuilder]] defined in
[[org.apache.flink.table.api.TableEnvironment]].
- */
- def getRexBuilder: RexBuilder
+ * Gets the [[FlinkRelBuilder]] defined in
[[org.apache.flink.table.api.TableEnvironment]].
+ */
+ def getFlinkRelBuilder: FlinkRelBuilder
/**
* Returns true if the output node needs final TimeIndicator conversion
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRelTimeIndicatorProgram.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRelTimeIndicatorProgram.scala
index f638b6c..2c46621 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRelTimeIndicatorProgram.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRelTimeIndicatorProgram.scala
@@ -31,7 +31,7 @@ import org.apache.calcite.rel.RelNode
class FlinkRelTimeIndicatorProgram[OC <: FlinkOptimizeContext] extends
FlinkOptimizeProgram[OC] {
override def optimize(input: RelNode, context: OC): RelNode = {
- val rexBuilder = Preconditions.checkNotNull(context.getRexBuilder)
+ val rexBuilder =
Preconditions.checkNotNull(context.getFlinkRelBuilder.getRexBuilder)
RelTimeIndicatorConverter.convert(input, rexBuilder,
context.needFinalTimeIndicatorConversion)
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
index 5ba2238..27889f1 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
@@ -19,24 +19,24 @@
package org.apache.flink.table.planner.plan.rules.logical
import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.expressions.{FieldReferenceExpression, _}
+import org.apache.flink.table.expressions._
import org.apache.flink.table.functions.{TemporalTableFunction,
TemporalTableFunctionImpl}
import org.apache.flink.table.operations.QueryOperation
-import org.apache.flink.table.planner.calcite.FlinkRelBuilder
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
import org.apache.flink.table.planner.functions.utils.TableSqlFunction
+import
org.apache.flink.table.planner.plan.optimize.program.FlinkOptimizeContext
import
org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.{makeProcTimeTemporalFunctionJoinConCall,
makeRowTimeTemporalFunctionJoinConCall}
import org.apache.flink.table.planner.plan.utils.{ExpandTableScanShuttle,
RexDefaultVisitor}
+import org.apache.flink.table.planner.utils.ShortcutUtils
import
org.apache.flink.table.types.logical.LogicalTypeRoot.{TIMESTAMP_WITHOUT_TIME_ZONE,
TIMESTAMP_WITH_LOCAL_TIME_ZONE}
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{hasRoot,
isProctimeAttribute}
import org.apache.flink.util.Preconditions.checkState
import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
-import org.apache.calcite.plan.hep.HepRelVertex
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptSchema}
-import org.apache.calcite.rel.{BiRel, RelNode, SingleRel}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core.{JoinRelType, TableFunctionScan}
-import org.apache.calcite.rel.logical.{LogicalCorrelate}
+import org.apache.calcite.rel.logical.LogicalCorrelate
import org.apache.calcite.rex._
/**
@@ -97,7 +97,10 @@ class LogicalCorrelateToJoinFromTemporalTableFunctionRule
.getUnderlyingHistoryTable
val rexBuilder = cluster.getRexBuilder
- val relBuilder = FlinkRelBuilder.of(cluster, getRelOptSchema(leftNode))
+ val flinkContext = ShortcutUtils.unwrapContext(call.getPlanner)
+ .asInstanceOf[FlinkOptimizeContext]
+ val relBuilder = flinkContext.getFlinkRelBuilder
+
val temporalTable: RelNode =
relBuilder.queryOperation(underlyingHistoryTable).build()
// expand QueryOperationCatalogViewTable in Table Scan
val shuttle = new ExpandTableScanShuttle
@@ -148,15 +151,6 @@ class LogicalCorrelateToJoinFromTemporalTableFunctionRule
rightDataTypeField.getType, rightReferencesOffset +
rightDataTypeField.getIndex)
}
- /**
- * Gets [[RelOptSchema]] from the leaf [[RelNode]] which holds a non-null
[[RelOptSchema]].
- */
- private def getRelOptSchema(relNode: RelNode): RelOptSchema = relNode match {
- case hep: HepRelVertex => getRelOptSchema(hep.getCurrentRel)
- case single: SingleRel => getRelOptSchema(single.getInput)
- case bi: BiRel => getRelOptSchema(bi.getLeft)
- case _ => relNode.getTable.getRelOptSchema
- }
}
object LogicalCorrelateToJoinFromTemporalTableFunctionRule {
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala
index afd4cec..7cc83ee 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala
@@ -98,6 +98,74 @@ class TemporalTableFunctionJoinITCase(state:
StateBackendMode)
}
@Test
+ def testProcessTimeInnerJoinWithConstantTable(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
+ env.setParallelism(1)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+ val result = tEnv.sqlQuery("SELECT amount, currency, proctime() as
proctime " +
+ "FROM (VALUES (1, 2.0)) AS T(amount, currency)").toAppendStream[Row]
+ result.addSink(new TestingAppendSink)
+ env.execute()
+ }
+
+ @Test
+ def testProcessTimeInnerJoinUnionAll(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
+ env.setParallelism(1)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+ val sqlQuery =
+ """
+ |SELECT
+ | o.amount * r.rate AS amount
+ |FROM
+ | Orders AS o,
+ | LATERAL TABLE (Rates(o.proctime)) AS r
+ |WHERE r.currency = o.currency
+ |""".stripMargin
+
+ val ordersData = new mutable.MutableList[(Long, String)]
+ ordersData.+=((2L, "Euro"))
+ ordersData.+=((1L, "US Dollar"))
+ ordersData.+=((50L, "Yen"))
+ ordersData.+=((3L, "Euro"))
+ ordersData.+=((5L, "US Dollar"))
+
+ val ratesHistoryData = new mutable.MutableList[(String, Long)]
+ ratesHistoryData.+=(("US Dollar", 102L))
+ ratesHistoryData.+=(("Euro", 114L))
+ ratesHistoryData.+=(("Yen", 1L))
+ ratesHistoryData.+=(("Euro", 116L))
+ ratesHistoryData.+=(("Euro", 119L))
+
+ val orders1 = env
+ .fromCollection(ordersData)
+ .toTable(tEnv, 'amount, 'currency, 'proctime.proctime)
+ val orders2 = env
+ .fromCollection(ordersData)
+ .toTable(tEnv, 'amount, 'currency, 'proctime.proctime)
+ val ratesHistory = env
+ .fromCollection(ratesHistoryData)
+ .toTable(tEnv, 'currency, 'rate, 'proctime.proctime)
+
+ tEnv.registerTable("Orders1", orders1)
+ tEnv.registerTable("Orders2", orders2)
+ tEnv.registerTable("RatesHistory", ratesHistory)
+
+ tEnv.registerFunction(
+ "Rates",
+ ratesHistory.createTemporalTableFunction($"proctime", $"currency"))
+ tEnv.registerTable(
+ "Orders",
+ tEnv.sqlQuery("SELECT * FROM Orders1 UNION ALL SELECT * FROM Orders2"))
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ result.addSink(new TestingAppendSink)
+ env.execute()
+ }
+
+ @Test
def testEventTimeInnerJoin(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)