beyond1920 commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r678085882
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
##########
@@ -327,9 +328,17 @@ abstract class LogicalWindowAggregateRuleBase(description:
String)
windowExpression: RexCall): RexNode
/** Returns the expression that replaces the window expression after the
aggregation. */
- private[table] def getOutAggregateGroupExpression(
+ private def getOutAggregateGroupExpression(
rexBuilder: RexBuilder,
- windowExpression: RexCall): RexNode
+ windowExpression: RexCall): RexNode = {
+ val zeroLiteral = rexBuilder.makeZeroLiteral(windowExpression.getType)
+ if (isTimeIndicatorType(windowExpression.getType)) {
+ // cast zero literal to time indicator field
Review comment:
In the previous version, this would not happen because the rowtime
indicator in group key would be materialized to regular timestamp.
But after we move time indicator materialize after logical_rewrite, the rule
need to encounter the window expression in group key.
It's safe to simply cast the literal to time indicator type, because the
window expression column in group key would be projected out in the successor
Project node.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
##########
@@ -255,7 +255,7 @@ LogicalProject(a=[$0], b=[$6])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, b])
-+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false,
leftLowerBound=-3600000, leftUpperBound=3600000, leftTimeIndex=1,
rightTimeIndex=2], where=[((a = a0) AND (PROCTIME_MATERIALIZE(proctime) >=
(PROCTIME_MATERIALIZE(proctime0) - 3600000:INTERVAL HOUR)) AND
(PROCTIME_MATERIALIZE(proctime) <= (PROCTIME_MATERIALIZE(proctime0) +
3600000:INTERVAL HOUR)))], select=[a, proctime, a0, b, proctime0])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false,
leftLowerBound=-3600000, leftUpperBound=3600000, leftTimeIndex=1,
rightTimeIndex=2], where=[((a = a0) AND (proctime >= (proctime0 -
3600000:INTERVAL HOUR)) AND (proctime <= (proctime0 + 3600000:INTERVAL
HOUR)))], select=[a, proctime, a0, b, proctime0])
Review comment:
The xml is changed because after predicate_pushdown phase, the filter
would be pushed down into join condition, while join condition conversion
logical in RelTimeIndicatorConverter is different with filter condition
conversion in the previous version.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/JoinTest.xml
##########
@@ -158,12 +158,12 @@ LogicalProject(a=[$0], e=[$5], lptime=[$3])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, e, PROCTIME_MATERIALIZE(lptime) AS lptime])
-+- Join(joinType=[InnerJoin], where=[((a = d) AND ($f4 = $f40))], select=[a,
lptime, $f4, d, e, $f40], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
- :- Exchange(distribution=[hash[a, $f4]])
- : +- Calc(select=[a, lptime, PROCTIME_MATERIALIZE(lptime) AS $f4])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false,
leftLowerBound=0, leftUpperBound=0, leftTimeIndex=1, rightTimeIndex=2],
where=[((a = d) AND (lptime = rptime))], select=[a, lptime, d, e, rptime])
Review comment:
The xml is changed because after refactor time indicator
materialization, the two case would be translated to IntervalJoin as expected.
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
##########
@@ -230,7 +230,6 @@ class IntervalJoinTest extends TableTestBase {
@Test
def testJoinWithEquiProcTime(): Unit = {
- // TODO: this should be translated into window join
Review comment:
done
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
##########
@@ -207,7 +207,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3],
rowtime=[$4], rowNum=[$5])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime,
1:BIGINT AS rowNum])
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime,
1:BIGINT AS $5])
Review comment:
The xml is changed because after apply `ProjectToWindowRule`, the new
generated LogicalWindow contains a field which name is w0$o0
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
##########
@@ -76,7 +76,7 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL],
select=[b, FIRST_VALUE_RET
+- Exchange(distribution=[hash[b]], changelogMode=[I,UB,UA,D])
+- GroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b,
$f2, FIRST_VALUE_RETRACT(c) AS $f2_0, LAST_VALUE_RETRACT(c) AS $f3,
COUNT_RETRACT(DISTINCT c) AS $f4], changelogMode=[I,UB,UA,D])
+- Exchange(distribution=[hash[b, $f2]], changelogMode=[I,UB,UA,D])
- +- Calc(select=[b, c, MOD(HASH_CODE(c), 1024) AS $f2],
changelogMode=[I,UB,UA,D])
+ +- Calc(select=[$f1 AS b, $f2 AS c, MOD(HASH_CODE($f2), 1024) AS
$f2], changelogMode=[I,UB,UA,D])
Review comment:
The xml is changed because after logical phase, a FlinkLogicalCalc which
rowtype of it's input node is different with the inputRowType of it's
RexProgram would be generated.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml
##########
@@ -117,9 +117,9 @@ LogicalProject(symbol=[$0], dPrice=[$1], matchRowtime=[$2])
Match(partitionBy=[symbol], orderBy=[matchRowtime ASC],
measures=[FINAL(A.price) AS dPrice, FINAL(A.matchRowtime) AS matchRowtime],
rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW],
pattern=[_UTF-16LE'A'], define=[{A=>=(PREV(A.$1, 0), -(CURRENT_TIMESTAMP,
86400000:INTERVAL DAY))}])
+- Exchange(distribution=[hash[symbol]])
+- Calc(select=[symbol, matchRowtime, price, w$start AS startTime])
- +- GroupWindowAggregate(groupBy=[symbol, matchRowtime, price],
window=[TumblingGroupWindow('w$, matchRowtime0, 3000)], properties=[w$start,
w$end, w$rowtime, w$proctime], select=[symbol, matchRowtime, price, start('w$)
AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS
w$proctime])
- +- Exchange(distribution=[hash[symbol, matchRowtime, price]])
- +- Calc(select=[symbol, CAST(matchRowtime) AS matchRowtime, price,
matchRowtime AS matchRowtime0])
+ +- GroupWindowAggregate(groupBy=[symbol, price, matchRowtime],
window=[TumblingGroupWindow('w$, matchRowtime0, 3000)], properties=[w$start,
w$end, w$rowtime, w$proctime], select=[symbol, price, matchRowtime, start('w$)
AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS
w$proctime])
Review comment:
The xml is changed because after `logical` phase, project contains
redundant expression would be simplified.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
##########
@@ -238,7 +238,7 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL],
select=[b, FIRST_VALUE_RET
+- Exchange(distribution=[hash[b]], changelogMode=[I,UB,UA,D])
+- GroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b,
$f2, FIRST_VALUE_RETRACT(c) AS $f2_0, LAST_VALUE_RETRACT(c) AS $f3,
COUNT_RETRACT(DISTINCT c) AS $f4], changelogMode=[I,UB,UA,D])
+- Exchange(distribution=[hash[b, $f2]], changelogMode=[I,UB,UA,D])
- +- Calc(select=[b, c, MOD(HASH_CODE(c), 1024) AS $f2],
changelogMode=[I,UB,UA,D])
+ +- Calc(select=[$f1 AS b, $f2 AS c, MOD(HASH_CODE($f2), 1024) AS
$f2], changelogMode=[I,UB,UA,D])
Review comment:
The xml is changed because after logical phase, a FlinkLogicalCalc which
rowtype of it's input node is different with the inputRowType of it's
RexProgram would be generated.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml
##########
@@ -267,9 +267,9 @@ LogicalProject(EXPR$0=[$2], long=[$0])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[EXPR$0, long])
-+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$,
rowtime, 100)], select=[long, MIN(rowtime0) AS EXPR$0])
++- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$,
rowtime, 100)], select=[long, MIN(rowtime) AS EXPR$0])
Review comment:
The xml is changed, because after `project_rewrite` phase, the identify
`project` is removed. The new project is generated by
`RelTimeIndicatorConverter` when materialize `Aggregate` node.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.xml
##########
@@ -212,8 +212,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[a, b, c, d, e, rowtime, PROCTIME_MATERIALIZE(proctime) AS
proctime, window_start, window_end, window_time, rownum])
-+- WindowRank(window=[CUMULATE(win_start=[window_start], win_end=[window_end],
max_size=[1 h], step=[10 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1,
rankEnd=3], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, d, e, rowtime,
proctime, window_start, window_end, window_time, rownum])
+Calc(select=[a, b, c, d, e, rowtime, PROCTIME_MATERIALIZE(proctime) AS
proctime, window_start, window_end, window_time, w0$o0])
Review comment:
The xml is changed because after apply ProjectToWindowRule, the new
generated LogicalWindow contains a field which name is w0$o0
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
##########
@@ -42,7 +42,7 @@ LogicalProject(b=[$1], aSum=[$2], bCnt=[$3])
GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime,
21600000)], select=[b, SUM(a0) AS aSum, COUNT(b0) AS bCnt])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[rowtime, b, a0, b0])
- +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=2,
rightTimeIndex=2], where=[((a = a0) AND (CAST(rowtime) >= (CAST(rowtime0) -
600000:INTERVAL MINUTE)) AND (CAST(rowtime) <= (CAST(rowtime0) +
3600000:INTERVAL HOUR)))], select=[a, b, rowtime, a0, b0, rowtime0])
+ +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=2,
rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 600000:INTERVAL
MINUTE)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, b,
rowtime, a0, b0, rowtime0])
Review comment:
The xml is changed because after predicate_pushdown phase, the filter
would be pushed down into join condition, while join condition conversion
logical in RelTimeIndicatorConverter is different with filter condition
conversion in the previous version.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
##########
@@ -138,11 +138,11 @@ Calc(select=[a, b, $f3 AS EXPR$2, $f4 AS EXPR$3, $f5 AS
uv])
+- Calc(select=[a, b, window_start, window_end, $e, $f8, $f9, $f5 AS
$f7, $f6 AS $f8_0, $f7 AS $f9_0])
+- WindowAggregate(groupBy=[a, b, $e, $f8, $f9],
window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])],
select=[a, b, $e, $f8, $f9, COUNT(*) FILTER $g_3 AS $f5, MAX(d) FILTER $g_1 AS
$f6, COUNT(DISTINCT window_time) FILTER $g_2 AS $f7, start('w$) AS
window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[a, b, $e, $f8, $f9]])
- +- Calc(select=[a, b, window_start, window_end, d, $f5,
window_time, $e, $f8, $f9, =($e_0, 3) AS $g_3, AND(=($e_0, 1), $f5) AS $g_1,
=($e_0, 2) AS $g_2])
+ +- Calc(select=[a, b, window_start, window_end, d, $f5,
CAST(window_time) AS window_time, $e, $f8, $f9, =($e_0, 3) AS $g_3, AND(=($e_0,
1), $f5) AS $g_1, =($e_0, 2) AS $g_2])
Review comment:
The xml is changed, because after project_rewrite phase, the identify
project is removed.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
##########
@@ -16,6 +16,37 @@ See the License for the specific language governing
permissions and
limitations under the License.
-->
<Root>
+ <TestCase name="testAggFilterClauseBothWithAvgAndCount">
Review comment:
The xml is changed because after `logical` phase, a `FlinkLogicalCalc`
which rowtype of it's input node is different with the inputRowType of it's
RexProgram would be generated.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
##########
@@ -433,7 +433,7 @@
LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a,
+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$,
rowtime, 8000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1],
changelogMode=[I]): rowcount = , cumulative cost = {rows, cpu, io, network,
memory}
+- Exchange(distribution=[hash[id1]], changelogMode=[I]): rowcount = ,
cumulative cost = {rows, cpu, io, network, memory}
+- Calc(select=[id1, rowtime, text, _UTF-16LE'#' AS $f3],
changelogMode=[I]): rowcount = , cumulative cost = {rows, cpu, io, network,
memory}
- +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=2,
rightTimeIndex=4], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0),
300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL
MINUTE)))], select=[id1, text, rowtime, id2, cnt, name, goods, rowtime0],
changelogMode=[I]): rowcount = , cumulative cost = {rows, cpu, io, network,
memory}
+ +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=2,
rightTimeIndex=4], where=[AND(=(id1, id2), >(rowtime, -(rowtime0,
300000:INTERVAL MINUTE)), <(rowtime, +(rowtime0, 180000:INTERVAL MINUTE)))],
select=[id1, text, rowtime, id2, cnt, name, goods, rowtime0],
changelogMode=[I]): rowcount = , cumulative cost = {rows, cpu, io, network,
memory}
Review comment:
The xml is changed because after `predicate_pushdown` phase, the filter
would be pushed down into join condition, while join condition conversion
logical in `RelTimeIndicatorConverter` is different with filter condition
conversion in the previous version.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
##########
@@ -314,12 +314,12 @@ LogicalProject(a=[$0], b=[$6])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, b])
-+- Join(joinType=[InnerJoin], where=[((a = a0) AND ($f5 = $f50))], select=[a,
$f5, a0, b, $f50], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
- :- Exchange(distribution=[hash[a, $f5]])
- : +- Calc(select=[a, PROCTIME_MATERIALIZE(proctime) AS $f5])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false,
leftLowerBound=0, leftUpperBound=0, leftTimeIndex=1, rightTimeIndex=2],
where=[((a = a0) AND (proctime = proctime0))], select=[a, proctime, a0, b,
proctime0])
Review comment:
The xml is changed because after refactor time indicator
materialization, the two case would be translated to IntervalJoin as expected.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
##########
@@ -440,7 +440,7 @@ LogicalProject(b=[$1], aSum=[$2], bCnt=[$3])
GroupWindowAggregate(groupBy=[b0], window=[TumblingGroupWindow('w$, rowtime0,
21600000)], select=[b0, SUM(a) AS aSum, COUNT(b) AS bCnt])
+- Exchange(distribution=[hash[b0]])
+- Calc(select=[rowtime0, b0, a, b])
- +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=2,
rightTimeIndex=2], where=[((a = a0) AND (CAST(rowtime) >= (CAST(rowtime0) -
600000:INTERVAL MINUTE)) AND (CAST(rowtime) <= (CAST(rowtime0) +
3600000:INTERVAL HOUR)))], select=[a, b, rowtime, a0, b0, rowtime0])
+ +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=2,
rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 600000:INTERVAL
MINUTE)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, b,
rowtime, a0, b0, rowtime0])
Review comment:
The xml is changed because after predicate_pushdown phase, the filter
would be pushed down into join condition, while join condition conversion
logical in RelTimeIndicatorConverter is different with filter condition
conversion in the previous version.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml
##########
@@ -522,7 +522,7 @@ LogicalProject(amount=[$0], currency=[$1], rowtime=[$2],
proctime=[$3], currency
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[amount, currency, rowtime, PROCTIME_MATERIALIZE(proctime) AS
proctime, currency0, rate, PROCTIME_MATERIALIZE(proctime0) AS proctime0])
+Calc(select=[amount, currency, rowtime, PROCTIME_MATERIALIZE(proctime) AS
proctime, currency0, rate, PROCTIME_MATERIALIZE($2) AS $2])
Review comment:
The xml is changed, because after project_rewrite phase, the identify
project is removed.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
##########
@@ -107,12 +107,12 @@ LogicalProject(a=[$0], b=[$6])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, b])
-+- Join(joinType=[InnerJoin], where=[((a = a0) AND (rowtime0 = rowtime00))],
select=[a, rowtime0, a0, b, rowtime00], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
- :- Exchange(distribution=[hash[a, rowtime0]])
- : +- Calc(select=[a, CAST(rowtime) AS rowtime0])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=0, leftUpperBound=0, leftTimeIndex=1, rightTimeIndex=2],
where=[((a = a0) AND (rowtime = rowtime0))], select=[a, rowtime, a0, b,
rowtime0])
Review comment:
The xml is changed because after refactor time indicator
materialization, the two case would be translated to `IntervalJoin` as expected.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
##########
@@ -357,7 +357,7 @@
LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a,
+- GroupWindowAggregate(groupBy=[id1],
window=[TumblingGroupWindow('w$, rowtime, 6000)], properties=[w$start, w$end,
w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS
w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS
w$proctime])
+- Exchange(distribution=[hash[id1]])
+- Calc(select=[rowtime, id1, text, _UTF-16LE'#' AS $f3])
- +- IntervalJoin(joinType=[InnerJoin],
windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999,
leftTimeIndex=1, rightTimeIndex=1], where=[AND(=(id1, id2), >(CAST(rowtime),
-(CAST(rowtime0), 300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0),
180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt,
name, goods])
+ +- IntervalJoin(joinType=[InnerJoin],
windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999,
leftTimeIndex=1, rightTimeIndex=1], where=[AND(=(id1, id2), >(rowtime,
-(rowtime0, 300000:INTERVAL MINUTE)), <(rowtime, +(rowtime0, 180000:INTERVAL
MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods])
Review comment:
The xml is changed because after predicate_pushdown phase, the filter
would be pushed down into join condition, while join condition conversion
logical in RelTimeIndicatorConverter is different with filter condition
conversion in the previous version.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/JoinTest.xml
##########
@@ -70,7 +70,7 @@ LogicalFilter(condition=[AND(=($0, $4), >=($3, -($7,
300000:INTERVAL DAY TO SECO
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-300000, leftUpperBound=-1, leftTimeIndex=3, rightTimeIndex=3],
where=[((a = d) AND (CAST(lrtime) >= (CAST(rrtime) - 300000:INTERVAL DAY TO
SECOND)) AND (CAST(lrtime) < CAST(rrtime)) AND (CAST(lrtime) > f))], select=[a,
b, c, lrtime, d, e, f, rrtime])
+IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-300000, leftUpperBound=-1, leftTimeIndex=3, rightTimeIndex=3],
where=[((a = d) AND (lrtime >= (rrtime - 300000:INTERVAL DAY TO SECOND)) AND
(lrtime < rrtime) AND (lrtime > f))], select=[a, b, c, lrtime, d, e, f, rrtime])
Review comment:
The xml is changed because after predicate_pushdown phase, the filter
would be pushed down into join condition, while join condition conversion
logical in RelTimeIndicatorConverter is different with filter condition
conversion in the previous version.
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
##########
@@ -242,7 +241,6 @@ class IntervalJoinTest extends TableTestBase {
@Test
def testJoinWithEquiRowTime(): Unit = {
- // TODO: this should be translated into window join
Review comment:
done
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnionTest.xml
##########
@@ -16,6 +16,42 @@ See the License for the specific language governing
permissions and
limitations under the License.
-->
<Root>
+ <TestCase name="testUnionDiffRowTime">
Review comment:
The xml is changed because the plan of `UnionTest#testUnionDiffRowTime`
could be generated now.
In `DEFAULT_REWRITE` phase, after apply
`CoerceInputsRule(classOf[LogicalUnion]`, optimizer would pre-casts inputs to
a particular type to ensure union set operator have the same row type.
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala
##########
@@ -112,9 +112,6 @@ class UnionTest extends TableTestBase {
@Test
def testUnionDiffRowTime(): Unit = {
- expectedException.expectMessage(
Review comment:
UnionTest#testUnionDiffRowTime could work fine now.
In DEFAULT_REWRITE phase, after apply
CoerceInputsRule(classOf[LogicalUnion], optimizer would pre-casts inputs to a
particular type to ensure union set operator have the same row type.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
##########
@@ -557,7 +557,7 @@ LogicalProject(a=[$0], b=[$6])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, b])
-+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=1,
rightTimeIndex=2], where=[((a = a0) AND (CAST(rowtime) >= (CAST(rowtime0) -
600000:INTERVAL MINUTE)) AND (CAST(rowtime) <= (CAST(rowtime0) +
3600000:INTERVAL HOUR)))], select=[a, rowtime, a0, b, rowtime0])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=1,
rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 600000:INTERVAL
MINUTE)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a,
rowtime, a0, b, rowtime0])
Review comment:
The xml is changed because after predicate_pushdown phase, the filter
would be pushed down into join condition, while join condition conversion
logical in RelTimeIndicatorConverter is different with filter condition
conversion in the previous version.
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRuleTest.scala
##########
@@ -35,7 +35,7 @@ class TemporalJoinRewriteWithUniqueKeyRuleTest extends
TableTestBase {
@Before
def setup(): Unit = {
- util.buildStreamProgram(LOGICAL_REWRITE)
+ util.buildStreamProgram(PHYSICAL)
Review comment:
To include `TIME_INDICATOR` phase
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]