This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 824259ecf53 [FLINK-31424][table-planner] Fix NPE produced by multiple
sink with local-global window aggregate
824259ecf53 is described below
commit 824259ecf53fd82061d177b508594e6510e69060
Author: Jane Chan <[email protected]>
AuthorDate: Wed Mar 22 16:30:58 2023 +0800
[FLINK-31424][table-planner] Fix NPE produced by multiple sink with
local-global window aggregate
This closes #22222
---
.../plan/metadata/FlinkRelMdWindowProperties.scala | 40 ++-
.../table/planner/plan/stats/FlinkStatistic.scala | 6 +-
.../plan/stream/sql/join/WindowJoinTest.xml | 72 +++++
.../plan/metadata/FlinkRelMdHandlerTestBase.scala | 318 ++++++++++++++++++++-
.../metadata/FlinkRelMdWindowPropertiesTest.scala | 184 ++++++++++++
.../planner/plan/metadata/MetadataTestUtil.scala | 36 +++
.../plan/stream/sql/join/WindowJoinTest.scala | 60 ++++
7 files changed, 708 insertions(+), 8 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala
index 30087585974..a688302e914 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala
@@ -19,6 +19,7 @@ package org.apache.flink.table.planner.plan.metadata
import org.apache.flink.table.planner.{JArrayList, JHashMap, JList}
import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
+import org.apache.flink.table.planner.plan.logical.WindowSpec
import org.apache.flink.table.planner.plan.nodes.calcite.{Expand,
WatermarkAssigner}
import
org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalAggregate,
FlinkLogicalCorrelate, FlinkLogicalJoin, FlinkLogicalRank}
import
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin
@@ -27,6 +28,7 @@ import
org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase
import
org.apache.flink.table.planner.plan.utils.WindowJoinUtil.satisfyWindowJoin
import
org.apache.flink.table.planner.plan.utils.WindowUtil.{convertToWindowingStrategy,
groupingContainsWindowStartEnd, isWindowTableFunctionCall}
import org.apache.flink.table.runtime.groupwindow._
+import org.apache.flink.table.types.logical.LogicalType
import org.apache.calcite.plan.hep.HepRelVertex
import org.apache.calcite.plan.volcano.RelSubset
@@ -254,11 +256,34 @@ class FlinkRelMdWindowProperties private extends
MetadataHandler[FlinkMetadata.W
def getWindowProperties(
rel: StreamPhysicalWindowAggregate,
mq: RelMetadataQuery): RelWindowProperties = {
+ getWindowAggregateWindowProperties(
+ rel.grouping.length + rel.aggCalls.size(),
+ rel.namedWindowProperties,
+ rel.windowing.getWindow,
+ rel.windowing.getTimeAttributeType
+ )
+ }
+
+ def getWindowProperties(
+ rel: StreamPhysicalGlobalWindowAggregate,
+ mq: RelMetadataQuery): RelWindowProperties = {
+ getWindowAggregateWindowProperties(
+ rel.grouping.length + rel.aggCalls.size(),
+ rel.namedWindowProperties,
+ rel.windowing.getWindow,
+ rel.windowing.getTimeAttributeType
+ )
+ }
+
+ private def getWindowAggregateWindowProperties(
+ propertyOffset: Int,
+ windowProperties: Seq[NamedWindowProperty],
+ windowSpec: WindowSpec,
+ timeAttributeType: LogicalType): RelWindowProperties = {
val starts = ArrayBuffer[Int]()
val ends = ArrayBuffer[Int]()
val times = ArrayBuffer[Int]()
- val propertyOffset = rel.grouping.length + rel.aggCalls.size()
- rel.namedWindowProperties.map(_.getProperty).zipWithIndex.foreach {
+ windowProperties.map(_.getProperty).zipWithIndex.foreach {
case (p, index) =>
p match {
case _: WindowStart =>
@@ -275,11 +300,18 @@ class FlinkRelMdWindowProperties private extends
MetadataHandler[FlinkMetadata.W
ImmutableBitSet.of(starts: _*),
ImmutableBitSet.of(ends: _*),
ImmutableBitSet.of(times: _*),
- rel.windowing.getWindow,
- rel.windowing.getTimeAttributeType
+ windowSpec,
+ timeAttributeType
)
}
+ def getWindowProperties(
+ rel: StreamPhysicalLocalWindowAggregate,
+ mq: RelMetadataQuery): RelWindowProperties = {
+ val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
+ fmq.getRelWindowProperties(rel.getInput)
+ }
+
def getWindowProperties(
rel: StreamPhysicalWindowRank,
mq: RelMetadataQuery): RelWindowProperties = {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala
index 4d5b4ecf2f7..7158a8039c9 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala
@@ -17,7 +17,7 @@
*/
package org.apache.flink.table.planner.plan.stats
-import org.apache.flink.table.catalog.{ContextResolvedTable, ResolvedSchema,
UniqueConstraint}
+import org.apache.flink.table.catalog.{ResolvedSchema, UniqueConstraint}
import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
import org.apache.flink.table.planner.plan.`trait`.{RelModifiedMonotonicity,
RelWindowProperties}
@@ -29,7 +29,6 @@ import org.apache.calcite.util.ImmutableBitSet
import javax.annotation.Nullable
import java.util
-import java.util.{HashSet, Optional, Set}
import scala.collection.JavaConversions._
@@ -206,6 +205,7 @@ object FlinkStatistic {
this.tableStats = statistic.getTableStats
this.uniqueKeys = statistic.getUniqueKeys
this.relModifiedMonotonicity = statistic.getRelModifiedMonotonicity
+ this.windowProperties = statistic.getRelWindowProperties
this
}
@@ -218,7 +218,7 @@ object FlinkStatistic {
) {
UNKNOWN
} else {
- new FlinkStatistic(tableStats, uniqueKeys, relModifiedMonotonicity)
+ new FlinkStatistic(tableStats, uniqueKeys, relModifiedMonotonicity,
windowProperties)
}
}
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
index 39a7abd3fd2..f94791d3b5a 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
@@ -154,6 +154,78 @@ WindowJoin(leftWindow=[TUMBLE(win_start=[window_start],
win_end=[window_end], si
+- LocalWindowAggregate(groupBy=[a],
window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, slice_end('w$)
AS $slice_end])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog,
default_database, MyTable2, project=[a, c, rowtime], metadata=[]]], fields=[a,
c, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinToMultiSink">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1],
fields=[window_start, window_end, user_id, dt, hour])
++- LogicalProject(window_start=[$1], window_end=[$2], user_id=[$0],
dt=[DATE_FORMAT(+($2, 25200000:INTERVAL HOUR), _UTF-16LE'yyyyMMdd')],
hour=[DATE_FORMAT(+($2, 25200000:INTERVAL HOUR), _UTF-16LE'HH')])
+ +- LogicalJoin(condition=[AND(=($0, $3), =($1, $4), =($2, $5))],
joinType=[left])
+ :- LogicalAggregate(group=[{0, 1, 2}])
+ : +- LogicalProject(user_id=[$0], window_start=[$4], window_end=[$5])
+ : +- LogicalTableFunctionScan(invocation=[TUMBLE($3, DESCRIPTOR($3),
60000:INTERVAL MINUTE)], rowType=[RecordType(VARCHAR(2147483647) user_id,
VARCHAR(2147483647) order_id, INTEGER amount, TIMESTAMP(3) *ROWTIME*
event_time, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3)
*ROWTIME* window_time)])
+ : +- LogicalProject(user_id=[$0], order_id=[$1], amount=[$2],
event_time=[$3])
+ : +- LogicalWatermarkAssigner(rowtime=[event_time],
watermark=[$3])
+ : +- LogicalTableScan(table=[[default_catalog,
default_database, food_order]])
+ +- LogicalAggregate(group=[{0, 1, 2}])
+ +- LogicalProject(user_id=[$0], window_start=[$4], window_end=[$5])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($3, DESCRIPTOR($3),
60000:INTERVAL MINUTE)], rowType=[RecordType(VARCHAR(2147483647) user_id,
VARCHAR(2147483647) order_id, INTEGER amount, TIMESTAMP(3) *ROWTIME*
event_time, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3)
*ROWTIME* window_time)])
+ +- LogicalProject(user_id=[$0], order_id=[$1], amount=[$2],
event_time=[$3])
+ +- LogicalWatermarkAssigner(rowtime=[event_time],
watermark=[$3])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, food_order]])
+
+LogicalSink(table=[default_catalog.default_database.sink2],
fields=[window_start, window_end, user_id, dt, hour])
++- LogicalProject(window_start=[$1], window_end=[$2], user_id=[$0],
dt=[DATE_FORMAT(+($2, 25200000:INTERVAL HOUR), _UTF-16LE'yyyyMMdd')],
hour=[DATE_FORMAT(+($2, 25200000:INTERVAL HOUR), _UTF-16LE'HH')])
+ +- LogicalJoin(condition=[AND(=($0, $3), =($1, $4), =($2, $5))],
joinType=[left])
+ :- LogicalAggregate(group=[{0, 1, 2}])
+ : +- LogicalProject(user_id=[$0], window_start=[$4], window_end=[$5])
+ : +- LogicalTableFunctionScan(invocation=[TUMBLE($3, DESCRIPTOR($3),
60000:INTERVAL MINUTE)], rowType=[RecordType(VARCHAR(2147483647) user_id,
VARCHAR(2147483647) order_id, INTEGER amount, TIMESTAMP(3) *ROWTIME*
event_time, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3)
*ROWTIME* window_time)])
+ : +- LogicalProject(user_id=[$0], order_id=[$1], amount=[$2],
event_time=[$3])
+ : +- LogicalWatermarkAssigner(rowtime=[event_time],
watermark=[$3])
+ : +- LogicalTableScan(table=[[default_catalog,
default_database, food_order]])
+ +- LogicalAggregate(group=[{0, 1, 2}])
+ +- LogicalProject(user_id=[$0], window_start=[$4], window_end=[$5])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($3, DESCRIPTOR($3),
60000:INTERVAL MINUTE)], rowType=[RecordType(VARCHAR(2147483647) user_id,
VARCHAR(2147483647) order_id, INTEGER amount, TIMESTAMP(3) *ROWTIME*
event_time, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3)
*ROWTIME* window_time)])
+ +- LogicalProject(user_id=[$0], order_id=[$1], amount=[$2],
event_time=[$3])
+ +- LogicalWatermarkAssigner(rowtime=[event_time],
watermark=[$3])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, food_order]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[window_start,
window_end, user_id, dt, hour])
++- Calc(select=[window_start, window_end, user_id, DATE_FORMAT(+(window_end,
25200000:INTERVAL HOUR), 'yyyyMMdd') AS dt, DATE_FORMAT(+(window_end,
25200000:INTERVAL HOUR), 'HH') AS hour])
+ +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start],
win_end=[window_end], size=[1 min])],
rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1
min])], joinType=[LeftOuterJoin], where=[=(user_id, user_id0)],
select=[user_id, window_start, window_end, user_id0, window_start0,
window_end0])
+ :- Exchange(distribution=[hash[user_id]])
+ : +- GlobalWindowAggregate(groupBy=[user_id],
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[user_id,
start('w$) AS window_start, end('w$) AS window_end])
+ : +- Exchange(distribution=[hash[user_id]])
+ : +- LocalWindowAggregate(groupBy=[user_id],
window=[TUMBLE(time_col=[event_time], size=[1 min])], select=[user_id,
slice_end('w$) AS $slice_end])
+ : +- WatermarkAssigner(rowtime=[event_time],
watermark=[event_time])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, food_order, project=[user_id, event_time], metadata=[]]],
fields=[user_id, event_time])
+ +- Exchange(distribution=[hash[user_id]])
+ +- GlobalWindowAggregate(groupBy=[user_id],
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[user_id,
start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[user_id]])
+ +- LocalWindowAggregate(groupBy=[user_id],
window=[TUMBLE(time_col=[event_time], size=[1 min])], select=[user_id,
slice_end('w$) AS $slice_end])
+ +- WatermarkAssigner(rowtime=[event_time],
watermark=[event_time])
+ +- TableSourceScan(table=[[default_catalog,
default_database, food_order, project=[user_id, event_time], metadata=[]]],
fields=[user_id, event_time])
+
+Sink(table=[default_catalog.default_database.sink2], fields=[window_start,
window_end, user_id, dt, hour])
++- Calc(select=[window_start, window_end, user_id, DATE_FORMAT(+(window_end,
25200000:INTERVAL HOUR), 'yyyyMMdd') AS dt, DATE_FORMAT(+(window_end,
25200000:INTERVAL HOUR), 'HH') AS hour])
+ +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start],
win_end=[window_end], size=[1 min])],
rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1
min])], joinType=[LeftOuterJoin], where=[=(user_id, user_id0)],
select=[user_id, window_start, window_end, user_id0, window_start0,
window_end0])
+ :- Exchange(distribution=[hash[user_id]])
+ : +- GlobalWindowAggregate(groupBy=[user_id],
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[user_id,
start('w$) AS window_start, end('w$) AS window_end])
+ : +- Exchange(distribution=[hash[user_id]])
+ : +- LocalWindowAggregate(groupBy=[user_id],
window=[TUMBLE(time_col=[event_time], size=[1 min])], select=[user_id,
slice_end('w$) AS $slice_end])
+ : +- WatermarkAssigner(rowtime=[event_time],
watermark=[event_time])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, food_order, project=[user_id, event_time], metadata=[]]],
fields=[user_id, event_time])
+ +- Exchange(distribution=[hash[user_id]])
+ +- GlobalWindowAggregate(groupBy=[user_id],
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[user_id,
start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[user_id]])
+ +- LocalWindowAggregate(groupBy=[user_id],
window=[TUMBLE(time_col=[event_time], size=[1 min])], select=[user_id,
slice_end('w$) AS $slice_end])
+ +- WatermarkAssigner(rowtime=[event_time],
watermark=[event_time])
+ +- TableSourceScan(table=[[default_catalog,
default_database, food_order, project=[user_id, event_time], metadata=[]]],
fields=[user_id, event_time])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 5535fc0671e..48d2dc204e0 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.metadata
import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.catalog.ContextResolvedFunction
import org.apache.flink.table.data.RowData
import org.apache.flink.table.expressions._
import org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis
@@ -25,6 +26,7 @@ import org.apache.flink.table.functions.{FunctionIdentifier,
UserDefinedFunction
import org.apache.flink.table.operations.TableSourceQueryOperation
import org.apache.flink.table.planner.calcite.{FlinkRelBuilder,
FlinkTypeFactory}
import org.apache.flink.table.planner.delegation.PlannerContext
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
import org.apache.flink.table.planner.functions.utils.AggSqlFunction
import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution,
FlinkRelDistributionTraitDef}
@@ -39,6 +41,7 @@ import
org.apache.flink.table.planner.plan.nodes.physical.stream._
import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase,
IntermediateRelTable, TableSourceTable}
import org.apache.flink.table.planner.plan.stream.sql.join.TestTemporalTable
import org.apache.flink.table.planner.plan.utils._
+import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions
import org.apache.flink.table.planner.utils.{PlannerMocks, Top3}
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext
import org.apache.flink.table.runtime.groupwindow._
@@ -48,6 +51,7 @@ import org.apache.flink.table.types.logical._
import org.apache.flink.table.types.utils.TypeConversions
import com.google.common.collect.{ImmutableList, Lists}
+import org.apache.calcite.avatica.util.TimeUnit
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan._
import org.apache.calcite.prepare.CalciteCatalogReader
@@ -60,7 +64,7 @@ import
org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQu
import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.{SqlAggFunction, SqlWindow}
+import org.apache.calcite.sql.{SqlAggFunction, SqlIntervalQualifier, SqlWindow}
import org.apache.calcite.sql.fun.{SqlCountAggFunction, SqlStdOperatorTable}
import org.apache.calcite.sql.fun.SqlStdOperatorTable._
import org.apache.calcite.sql.parser.SqlParserPos
@@ -197,6 +201,13 @@ class FlinkRelMdHandlerTestBase {
protected lazy val tableSourceTableNonKeyStreamScan:
StreamPhysicalDataStreamScan =
createTableSourceTable(ImmutableList.of("TableSourceTable3"),
streamPhysicalTraits)
+ protected lazy val temporalTableLogicalScan: LogicalTableScan =
+ createDataStreamScan(ImmutableList.of("TemporalTable4"), logicalTraits)
+ protected lazy val temporalTableFlinkLogicalScan:
FlinkLogicalDataStreamTableScan =
+ createDataStreamScan(ImmutableList.of("TemporalTable4"),
flinkLogicalTraits)
+ protected lazy val temporalTableStreamScan: StreamPhysicalDataStreamScan =
+ createDataStreamScan(ImmutableList.of("TemporalTable4"),
streamPhysicalTraits)
+
private lazy val valuesType = relBuilder.getTypeFactory
.builder()
.add("a", SqlTypeName.BIGINT)
@@ -3156,6 +3167,10 @@ class FlinkRelMdHandlerTestBase {
protected lazy val batchCumulateWindowTVFRel = createWindowTVFRel(false,
cumulateWindowSpec)
protected lazy val streamCumulateWindowTVFRel = createWindowTVFRel(true,
cumulateWindowSpec)
+ protected lazy val timeAttributeType = new TimestampType(true,
TimestampKind.ROWTIME, 3)
+ protected lazy val proctimeType = new LocalZonedTimestampType(true,
TimestampKind.PROCTIME, 3)
+ protected lazy val windowStartEndType = new TimestampType(false, 3)
+
protected def createWindowTVFRel(
isStreamingMode: Boolean,
windowSpec: WindowSpec): CommonPhysicalWindowTableFunction = {
@@ -3195,6 +3210,307 @@ class FlinkRelMdHandlerTestBase {
}
}
+ // equivalent SQL is
+ // SELECT * FROM
+ // TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(ptime), INTERVAL '10' MINUTE))
+ // CREATE TEMPORARY VIEW tmp AS
+ // SELECT `id`, `name`, PROCTIME() AS ptime FROM student
+ protected lazy val windowTableFunctionScan: TableFunctionScan =
createTableFunctionScan(true)
+
+ // equivalent SQL is
+ // SELECT `name`, `val` FROM student,
+ // LATERAL TABLE(STRING_SPLIT(`name`, CAST(`id` AS STRING))) AS T(`val`);
+ protected lazy val lateralTableFunctionScan: TableFunctionScan =
createTableFunctionScan(false)
+
+ protected def createTableFunctionScan(windowFunctionCall: Boolean):
TableFunctionScan = {
+ relBuilder.push(studentLogicalScan)
+
+ if (windowFunctionCall) {
+ val projects = List(
+ relBuilder.field(0),
+ relBuilder.field(1),
+ relBuilder.call(FlinkSqlOperatorTable.PROCTIME))
+ val outputRowType = typeFactory.buildRelNodeRowType(
+ Array("id", "name", "ptime"),
+ Array(
+ new BigIntType,
+ new VarCharType,
+ proctimeType
+ )
+ )
+ val calcOnStudentScan =
+ createLogicalCalc(
+ studentLogicalScan,
+ outputRowType,
+ projects,
+ null
+ )
+ new FlinkLogicalTableFunctionScan(
+ cluster,
+ logicalTraits,
+ ImmutableList.of(calcOnStudentScan),
+ relBuilder.call(
+ FlinkSqlOperatorTable.TUMBLE,
+ relBuilder.field(2),
+ relBuilder.call(FlinkSqlOperatorTable.DESCRIPTOR,
relBuilder.field(2)),
+ rexBuilder.makeIntervalLiteral(
+ bd(600000L),
+ new SqlIntervalQualifier(TimeUnit.MILLISECOND, null,
SqlParserPos.ZERO))
+ ),
+ null,
+ typeFactory.builder
+ .kind(outputRowType.getStructKind)
+ .addAll(outputRowType.getFieldList)
+ .add("window_start", SqlTypeName.TIMESTAMP, 3)
+ .add("window_end", SqlTypeName.TIMESTAMP, 3)
+ .add("window_time", outputRowType.getFieldList.get(2).getType)
+ .build,
+ null)
+ } else {
+ val correlVar = rexBuilder.makeCorrel(
+ typeFactory.buildRelNodeRowType(
+ Array("id", "name", "val"),
+ Array(
+ new BigIntType,
+ new VarCharType,
+ new VarCharType
+ )
+ ),
+ new CorrelationId(0))
+ val tableFunctionCall = relBuilder.call(
+ BridgingSqlFunction.of(
+ relBuilder.getCluster,
+ ContextResolvedFunction.temporary(
+ FunctionIdentifier.of("STRING_SPLIT"),
+ new JavaUserDefinedTableFunctions.StringSplit())),
+ rexBuilder.makeFieldAccess(correlVar, 1),
+ rexBuilder.makeCall(stringType, CAST,
List(rexBuilder.makeFieldAccess(correlVar, 0)))
+ )
+ new FlinkLogicalTableFunctionScan(
+ cluster,
+ logicalTraits,
+ ImmutableList.of(),
+ tableFunctionCall,
+ null,
+ typeFactory.buildRelNodeRowType(Array("EXPR$0"), Array(new
VarCharType)),
+ null
+ )
+ }
+ }
+
+ // equivalent SQL is
+ // SELECT b, window_end AS my_window_end, window_start FROM
+ // TABLE(TUMBLE (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL '10'
MINUTE))
+ protected lazy val keepWindowCalcOnTumbleWindowTVF: Calc =
+ createCalcOnWindowTVF(streamTumbleWindowTVFRel, true)
+
+ // equivalent SQL is
+ // SELECT b, CAST(window_end AS STRING) AS my_end, CAST(window_start AS
STRING) AS my_start FROM
+ // TABLE(TUMBLE (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL '10'
MINUTE))
+ protected lazy val discardWindowCalcOnTumbleWindowTVF: Calc =
+ createCalcOnWindowTVF(streamTumbleWindowTVFRel, false)
+ // equivalent SQL is
+ // SELECT b, window_end AS my_window_end, window_start FROM
+ // TABLE(HOP (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL '10'
MINUTE, INTERVAL '1' HOUR))
+ protected lazy val keepWindowCalcOnHopWindowTVF: Calc =
+ createCalcOnWindowTVF(streamHopWindowTVFRel, true)
+
+ // equivalent SQL is
+ // SELECT b, CAST(window_end AS STRING) AS my_end, CAST(window_start AS
STRING) AS my_start FROM
+ // TABLE(HOP (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL '10'
MINUTE, INTERVAL '1' HOUR))
+ protected lazy val discardWindowCalcOnHopWindowTVF: Calc =
+ createCalcOnWindowTVF(streamHopWindowTVFRel, false)
+ // equivalent SQL is
+ // SELECT b, window_end AS my_window_end, window_start FROM
+ // TABLE(CUMULATE (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL
'10' MINUTE, INTERVAL '1' HOUR))
+ protected lazy val keepWindowCalcOnCumulateWindowTVF: Calc =
+ createCalcOnWindowTVF(streamCumulateWindowTVFRel, true)
+
+ // equivalent SQL is
+ // SELECT b, CAST(window_end AS STRING) AS my_end, CAST(window_start AS
STRING) AS my_start FROM
+ // TABLE(CUMULATE (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL
'10' MINUTE, INTERVAL '1' HOUR))
+ protected lazy val discardWindowCalcOnCumulateWindowTVF: Calc =
+ createCalcOnWindowTVF(streamCumulateWindowTVFRel, false)
+ protected def createCalcOnWindowTVF(
+ tvf: CommonPhysicalWindowTableFunction,
+ projectKeepWindow: Boolean): Calc = {
+ relBuilder.push(tvf)
+ val (projects, outputType) = {
+ if (projectKeepWindow) {
+ (
+ List(relBuilder.field(1), relBuilder.field(6), relBuilder.field(5)),
+ typeFactory.buildRelNodeRowType(
+ Array("b", "my_window_end", "window_start"),
+ Array(
+ VarCharType.STRING_TYPE,
+ windowStartEndType,
+ windowStartEndType
+ )
+ ))
+ } else {
+ (
+ List(
+ relBuilder.field(1),
+ rexBuilder.makeCast(stringType, relBuilder.field(6)),
+ rexBuilder.makeCast(stringType, relBuilder.field(5))),
+ typeFactory.buildRelNodeRowType(
+ Array("b", "my_window_end", "window_start"),
+ Array(
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE,
+ VarCharType.STRING_TYPE
+ )
+ ))
+ }
+ }
+ val program = RexProgram.create(tvf.getRowType, projects, null,
outputType, rexBuilder)
+ new StreamPhysicalCalc(
+ cluster,
+ streamPhysicalTraits,
+ tvf,
+ program,
+ program.getOutputRowType
+ )
+ }
+
+ // equivalent SQL is
+ // SELECT * FROM
+ // TABLE(TUMBLE (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL '10'
MINUTE))
+ // UNION ALL
+ // (SELECT * FROM
+ // TABLE(TUMBLE (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL '10'
MINUTE)))
+ protected lazy val unionOnWindowTVFWithSameWindowSpec: Union =
+ createUnionOnWindowTVF(streamTumbleWindowTVFRel, streamTumbleWindowTVFRel)
+
+ // SELECT * FROM
+ // TABLE(TUMBLE (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL '10'
MINUTE))
+ // UNION ALL
+ // (SELECT * FROM
+ // TABLE(HOP (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL '10'
MINUTE, INTERVAL '1' HOUR)))
+ protected lazy val unionOnWindowTVFWithDifferentWindowSpec: Union =
+ createUnionOnWindowTVF(streamTumbleWindowTVFRel, streamHopWindowTVFRel)
+
+ protected def createUnionOnWindowTVF(
+ tvf1: CommonPhysicalWindowTableFunction,
+ tvf2: CommonPhysicalWindowTableFunction): Union = {
+ new StreamPhysicalUnion(cluster, streamPhysicalTraits, List(tvf1, tvf2),
true, tvf1.getRowType)
+ }
+
+ // hash by field a
+ protected lazy val hashOnTumbleWindowTVF =
createExchangeOnWindowTVF(streamTumbleWindowTVFRel)
+ protected lazy val hashOnHopWindowTVF =
createExchangeOnWindowTVF(streamHopWindowTVFRel)
+ protected lazy val hashOnCumulateWindowTVF =
createExchangeOnWindowTVF(streamCumulateWindowTVFRel)
+
+ protected def createExchangeOnWindowTVF(tvf:
CommonPhysicalWindowTableFunction): Exchange = {
+ val hash = FlinkRelDistribution.hash(Array(0), requireStrict = true)
+ new StreamPhysicalExchange(
+ cluster,
+ streamPhysicalTraits.replace(hash),
+ tvf,
+ hash
+ )
+ }
+
+ // equivalent SQL is
+ // CREATE TEMPORARY VIEW tmp AS
+ // SELECT `id`, `name`, PROCTIME() AS ptime FROM student
+ // SELECT `id`, `window_start`, `window_end`, COUNT(DISTINCT `name`) AS cnt
FROM
+ // (TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(ptime), INTERVAL '10' MINUTE)))
alias GROUP BY `id`, `window_start`, `window_end`
+ protected lazy val logicalGroupWindowAggOnTumbleWindowTVF =
createLogicalAggregateOnWindowTVF(
+ true)
+
+ // equivalent SQL is
+ // CREATE TEMPORARY VIEW tmp AS
+ // SELECT `id`, `name`, PROCTIME() AS ptime FROM student
+ // SELECT `id`, COUNT(DISTINCT `name`) AS cnt FROM
+ // (TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(ptime), INTERVAL '10' MINUTE)))
alias GROUP BY `id`
+ protected lazy val logicalGroupAggOnTumbleWindowTVF =
createLogicalAggregateOnWindowTVF(false)
+ protected def createLogicalAggregateOnWindowTVF(groupByWindow: Boolean):
FlinkLogicalAggregate = {
+ relBuilder.push(windowTableFunctionScan)
+ val groupKey =
+ if (groupByWindow)
+ List(relBuilder.field(0), relBuilder.field(3), relBuilder.field(4))
+ else List(relBuilder.field(0))
+ val logicalAgg =
+ relBuilder
+ .aggregate(
+ relBuilder.groupKey(groupKey),
+ relBuilder.count(true, "cnt", relBuilder.field(1)))
+ .build()
+ .asInstanceOf[LogicalAggregate]
+ new FlinkLogicalAggregate(
+ cluster,
+ flinkLogicalTraits,
+ windowTableFunctionScan,
+ logicalAgg.getGroupSet,
+ logicalAgg.getGroupSets,
+ logicalAgg.getAggCallList
+ )
+ }
+
+ // equivalent SQL is
+ // SELECT window_start, window_end, SUM(a) AS sum_a, COUNT(b) AS cnt_b FROM
+ // TABLE(TUMBLE (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL '10'
MINUTE))
+ // GROUP BY window_start, window_end
+ protected lazy val (
+ streamWindowAggOnWindowTVF,
+ streamLocalWindowAggOnWindowTVF,
+ streamGlobalWindowAggOnWindowTVF) = {
+ val logicalAgg =
+ relBuilder
+ .push(streamTumbleWindowTVFRel)
+ .aggregate(
+ relBuilder.groupKey(relBuilder.field(5), relBuilder.field(6)),
+ relBuilder.sum(false, "sum_a", relBuilder.field(0)),
+ relBuilder.count(false, "cnt_b", relBuilder.field(1))
+ )
+ .build()
+ .asInstanceOf[LogicalAggregate]
+
+ val windowRef = new WindowReference("w$", timeAttributeType)
+ val namedWindowProperties: Seq[NamedWindowProperty] = Seq(
+ new NamedWindowProperty("window_start", new WindowStart(windowRef)),
+ new NamedWindowProperty("window_end", new WindowEnd(windowRef)))
+ val traitSet = streamPhysicalTraits.replace(FlinkRelDistribution.SINGLETON)
+ val streamWindowAggOnWindowTVF = new StreamPhysicalWindowAggregate(
+ cluster,
+ traitSet,
+ streamTumbleWindowTVFRel,
+ Array(),
+ logicalAgg.getAggCallList,
+ new WindowAttachedWindowingStrategy(tumbleWindowSpec, timeAttributeType,
5, 6),
+ namedWindowProperties)
+
+ val streamLocalWindowAggOnWindowTVF = new
StreamPhysicalLocalWindowAggregate(
+ cluster,
+ traitSet,
+ streamTumbleWindowTVFRel,
+ streamWindowAggOnWindowTVF.grouping,
+ streamWindowAggOnWindowTVF.aggCalls,
+ streamWindowAggOnWindowTVF.windowing)
+
+ val exchange = new StreamPhysicalExchange(
+ cluster,
+ traitSet,
+ streamLocalWindowAggOnWindowTVF,
+ FlinkRelDistribution.SINGLETON)
+ val globalWindowing = new SliceAttachedWindowingStrategy(
+ tumbleWindowSpec,
+ timeAttributeType,
+ streamLocalWindowAggOnWindowTVF.getRowType.getFieldCount - 1)
+ val streamGlobalWindowAggOnWindowTVF = new
StreamPhysicalGlobalWindowAggregate(
+ cluster,
+ traitSet,
+ exchange,
+ streamTumbleWindowTVFRel.getRowType,
+ Array(),
+ streamWindowAggOnWindowTVF.aggCalls,
+ globalWindowing,
+ namedWindowProperties)
+
+ (streamWindowAggOnWindowTVF, streamLocalWindowAggOnWindowTVF,
streamGlobalWindowAggOnWindowTVF)
+ }
+
// select * from TableSourceTable1
// left join TableSourceTable2 on TableSourceTable1.b = TableSourceTable2.b
protected lazy val logicalLeftJoinOnContainedUniqueKeys: RelNode = relBuilder
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowPropertiesTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowPropertiesTest.scala
new file mode 100644
index 00000000000..3ef62d0ff43
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowPropertiesTest.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.metadata
+
+import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
+import org.apache.flink.table.planner.plan.logical.WindowSpec
+import org.apache.flink.table.types.logical.LogicalType
+
+import org.apache.calcite.util.ImmutableBitSet
+import org.junit.Assert._
+import org.junit.Test
+
+/** Test for [[FlinkRelMdWindowProperties]]. */
+class FlinkRelMdWindowPropertiesTest extends FlinkRelMdHandlerTestBase {
+
+ @Test
+ def testGetWindowPropertiesOnTableScan(): Unit = {
+ // get window properties from statistics
+ Array(studentLogicalScan, studentFlinkLogicalScan,
studentStreamScan).foreach(
+ scan => assertEquals(null, mq.getRelWindowProperties(scan)))
+
+ Array(temporalTableLogicalScan, temporalTableFlinkLogicalScan,
temporalTableStreamScan).foreach(
+ scan =>
+ assertEquals(
+ createRelWindowProperties(0, 1, 2, tumbleWindowSpec,
timeAttributeType),
+ mq.getRelWindowProperties(scan)
+ ))
+ }
+
+ @Test
+ def testGetWindowPropertiesOnTableFunctionScan(): Unit = {
+ Array(windowTableFunctionScan,
lateralTableFunctionScan).zipWithIndex.foreach {
+ case (scan, idx) =>
+ assertEquals(
+ Array(createRelWindowProperties(3, 4, 5, tumbleWindowSpec,
proctimeType), null).apply(
+ idx),
+ mq.getRelWindowProperties(scan)
+ )
+ }
+ }
+
+ @Test
+ def testGetWindowPropertiesOnWindowTableFunction(): Unit = {
+ Array(streamTumbleWindowTVFRel, streamHopWindowTVFRel,
streamCumulateWindowTVFRel).zipWithIndex
+ .foreach {
+ case (tvf, idx) =>
+ assertEquals(
+ createRelWindowProperties(
+ 5,
+ 6,
+ 7,
+ Array(tumbleWindowSpec, hopWindowSpec,
cumulateWindowSpec).apply(idx),
+ timeAttributeType),
+ mq.getRelWindowProperties(tvf)
+ )
+ }
+ }
+
+ @Test
+ def testGetWindowPropertiesOnCalc(): Unit = {
+ Array(
+ keepWindowCalcOnTumbleWindowTVF,
+ keepWindowCalcOnHopWindowTVF,
+ keepWindowCalcOnCumulateWindowTVF).zipWithIndex
+ .foreach {
+ case (calc, idx) =>
+ assertEquals(
+ createRelWindowProperties(
+ 2,
+ 1,
+ -1,
+ Array(tumbleWindowSpec, hopWindowSpec,
cumulateWindowSpec).apply(idx),
+ proctimeType),
+ mq.getRelWindowProperties(calc)
+ )
+ }
+ Array(
+ discardWindowCalcOnTumbleWindowTVF,
+ discardWindowCalcOnHopWindowTVF,
+ discardWindowCalcOnCumulateWindowTVF)
+ .foreach(
+ calc =>
+ assertEquals(
+ null,
+ mq.getRelWindowProperties(calc)
+ ))
+ }
+
+ @Test
+ def testGetWindowPropertiesOnUnion(): Unit = {
+ Array(unionOnWindowTVFWithSameWindowSpec,
unionOnWindowTVFWithDifferentWindowSpec).zipWithIndex
+ .foreach {
+ case (union, idx) =>
+ assertEquals(
+ Array(createRelWindowProperties(5, 6, 7, tumbleWindowSpec,
timeAttributeType), null)
+ .apply(idx),
+ mq.getRelWindowProperties(union)
+ )
+ }
+ }
+
+ @Test
+ def testGetWindowPropertiesOnExchange(): Unit = {
+ Array(hashOnTumbleWindowTVF, hashOnHopWindowTVF,
hashOnCumulateWindowTVF).zipWithIndex.foreach {
+ case (exchange, idx) =>
+ assertEquals(
+ createRelWindowProperties(
+ 5,
+ 6,
+ 7,
+ Array(
+ tumbleWindowSpec,
+ hopWindowSpec,
+ cumulateWindowSpec
+ ).apply(idx),
+ timeAttributeType),
+ mq.getRelWindowProperties(exchange)
+ )
+ }
+ }
+
+ @Test
+ def testGetWindowPropertiesOnLogicalAggregate(): Unit = {
+ Array(logicalGroupWindowAggOnTumbleWindowTVF,
logicalGroupAggOnTumbleWindowTVF).zipWithIndex
+ .foreach {
+ case (groupAgg, idx) =>
+ assertEquals(
+ Array(
+ createRelWindowProperties(1, 2, -1, tumbleWindowSpec,
proctimeType),
+ null
+ ).apply(idx),
+ mq.getRelWindowProperties(groupAgg)
+ )
+ }
+ }
+
+ @Test
+ def testGetWindowPropertiesOnPhysicalAggregate(): Unit = {
+ Array(
+ streamWindowAggOnWindowTVF,
+ streamLocalWindowAggOnWindowTVF,
+ streamGlobalWindowAggOnWindowTVF).zipWithIndex.foreach {
+ case (groupAgg, idx) =>
+ assertEquals(
+ Array(
+ createRelWindowProperties(2, 3, -1, tumbleWindowSpec,
timeAttributeType),
+ createRelWindowProperties(5, 6, 7, tumbleWindowSpec,
timeAttributeType),
+ createRelWindowProperties(2, 3, -1, tumbleWindowSpec,
timeAttributeType)
+ ).apply(idx),
+ mq.getRelWindowProperties(groupAgg)
+ )
+ }
+ }
+
+ private def createRelWindowProperties(
+ start: Int,
+ end: Int,
+ time: Int,
+ spec: WindowSpec,
+ timeAttributeType: LogicalType): RelWindowProperties = {
+ RelWindowProperties.create(
+ ImmutableBitSet.of(start),
+ ImmutableBitSet.of(end),
+ if (time >= 0) ImmutableBitSet.of(time) else ImmutableBitSet.of(),
+ spec,
+ timeAttributeType)
+ }
+
+}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
index 671c4b854c9..af245270093 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
@@ -25,6 +25,8 @@ import
org.apache.flink.table.connector.source.{DynamicTableSource, ScanTableSou
import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkContextImpl,
FlinkTypeFactory}
+import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
+import org.apache.flink.table.planner.plan.logical.TumblingWindowSpec
import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase,
TableSourceTable}
import org.apache.flink.table.planner.plan.stats.FlinkStatistic
import
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
@@ -37,7 +39,9 @@ import org.apache.calcite.rel.`type`.{RelDataType,
RelDataTypeFactory}
import org.apache.calcite.schema.{Schema, SchemaPlus, Table}
import org.apache.calcite.schema.Schema.TableType
import org.apache.calcite.sql.{SqlCall, SqlNode}
+import org.apache.calcite.util.ImmutableBitSet
+import java.time.Duration
import java.util
import java.util.Collections
@@ -57,6 +61,7 @@ object MetadataTestUtil {
rootSchema.add("TemporalTable1", createTemporalTable1())
rootSchema.add("TemporalTable2", createTemporalTable2())
rootSchema.add("TemporalTable3", createTemporalTable3())
+ rootSchema.add("TemporalTable4", createTemporalTable4())
rootSchema.add("TableSourceTable1", createTableSourceTable1())
rootSchema.add("TableSourceTable2", createTableSourceTable2())
rootSchema.add("TableSourceTable3", createTableSourceTable3())
@@ -265,6 +270,37 @@ object MetadataTestUtil {
getMetadataTable(fieldNames, fieldTypes, new FlinkStatistic(tableStats))
}
+ private def createTemporalTable4(): Table = {
+ val fieldNames = Array("window_start", "window_end", "window_time", "a",
"b", "c")
+ val fieldTypes = Array[LogicalType](
+ new TimestampType(false, 3),
+ new TimestampType(false, 3),
+ new TimestampType(true, TimestampKind.ROWTIME, 3),
+ new IntType(),
+ new BigIntType(),
+ VarCharType.STRING_TYPE
+ )
+
+ val windowProperties = RelWindowProperties.create(
+ ImmutableBitSet.of(0),
+ ImmutableBitSet.of(1),
+ ImmutableBitSet.of(2),
+ new TumblingWindowSpec(Duration.ofMinutes(10L), null),
+ fieldTypes.apply(2))
+
+ val colStatsMap = Map[String, ColumnStats](
+ "a" -> new ColumnStats(3740000000L, 0L, 4d, 4, null, null),
+ "b" -> new ColumnStats(53252726L, 1474L, 8d, 8, 100000000L, -100000000L),
+ "c" -> new ColumnStats(null, 0L, 18.6, 64, null, null)
+ )
+
+ val tableStats = new TableStats(4000000000L, colStatsMap)
+ getMetadataTable(
+ fieldNames,
+ fieldTypes,
+ new FlinkStatistic(tableStats, relWindowProperties = windowProperties))
+ }
+
private val flinkContext = new FlinkContextImpl(
false,
TableConfig.getDefault,
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
index 9788791e781..9fe0b3e7fbf 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
@@ -1273,4 +1273,64 @@ class WindowJoinTest extends TableTestBase {
""".stripMargin
util.verifyRelPlan(sql)
}
+
+ @Test
+ def testJoinToMultiSink(): Unit = {
+ val sourceDdl =
+ """
+ |CREATE TABLE food_order (
+ | user_id STRING,
+ | order_id STRING,
+ | amount INT,
+ | event_time TIMESTAMP(3),
+ | WATERMARK FOR event_time AS event_time
+ |) WITH (
+ |'connector' = 'values')
+ |""".stripMargin
+ util.tableEnv.executeSql(sourceDdl)
+
+ val query =
+ """
+ |CREATE TEMPORARY VIEW food_view AS
+ |WITH food AS (
+ | SELECT user_id,
+ | window_start,
+ | window_end
+ | FROM TABLE(TUMBLE(TABLE food_order, DESCRIPTOR(event_time),
INTERVAL '1' MINUTES))
+ | GROUP BY
+ | user_id,
+ | window_start,
+ | window_end)
+ |SELECT food.window_start
+ | ,food.window_end
+ | ,food.user_id
+ | ,DATE_FORMAT(food.window_end + INTERVAL '7' HOUR, 'yyyyMMdd') AS
dt
+ | ,DATE_FORMAT(food.window_end + INTERVAL '7' HOUR, 'HH') AS `hour`
+ |FROM food
+ |LEFT JOIN food AS a ON food.user_id = a.user_id
+ |AND food.window_start = a.window_start
+ |AND food.window_end = a.window_end
+ |""".stripMargin
+
+ util.tableEnv.executeSql(query)
+
+ val sinkDdl =
+ """
+ |CREATE TABLE %s (
+ | window_start TIMESTAMP(3),
+ | window_end TIMESTAMP(3),
+ | user_id STRING,
+ | dt STRING,
+ | `hour` STRING
+ |) WITH (
+ | 'connector' = 'values')
+ |""".stripMargin
+ util.tableEnv.executeSql(sinkDdl.format("sink1"))
+ util.tableEnv.executeSql(sinkDdl.format("sink2"))
+
+ val statementSet = util.tableEnv.createStatementSet()
+ statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM food_view")
+ statementSet.addInsertSql("INSERT INTO sink2 SELECT * FROM food_view")
+ util.verifyRelPlan(statementSet)
+ }
}