This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 06489d1982e83e4890f69cfa3b492def38bb3dfd Author: yuzhao.cyz <[email protected]> AuthorDate: Tue Mar 17 21:11:38 2020 +0800 [FLINK-14338][table-planner-blink] Plan verify changes from DIGEST to EXPLAIN * Because of CALCITE-3713, the project names was removed from plan DIGEST, thus, the DIGEST plan has less info that EXPLAIN, we switch to EXPLAIN for plan verification; * The CALC window fields name was also changes with CALCITE-3713, which is acceptable because the old name was also un-readable, the new name was based on field index, not bad. --- .../catalog/QueryOperationCatalogViewTable.java | 6 +- .../table/planner/calcite/FlinkRelBuilder.scala | 9 +++ .../table/planner/plan/utils/FlinkRelOptUtil.scala | 2 +- .../digest/testGetDigestWithDynamicFunction.out | 12 ++-- .../testGetDigestWithDynamicFunctionView.out | 12 ++-- .../table/planner/plan/batch/sql/RankTest.xml | 16 ++--- .../FlinkLogicalRankRuleForConstantRangeTest.xml | 32 ++++----- .../FlinkLogicalRankRuleForRangeEndTest.xml | 2 +- .../planner/plan/stream/sql/DeduplicateTest.xml | 76 +++++++++++----------- .../table/planner/plan/stream/sql/RankTest.xml | 10 +-- .../flink/table/planner/utils/TableTestBase.scala | 28 ++++---- 11 files changed, 106 insertions(+), 99 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java index f184be0..4c8b07b 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java @@ -25,7 +25,6 @@ import org.apache.flink.table.planner.calcite.FlinkRelBuilder; import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable; import org.apache.flink.table.planner.plan.stats.FlinkStatistic; -import org.apache.calcite.plan.Contexts; import org.apache.calcite.plan.RelOptSchema; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.rel.RelNode; @@ -69,9 +68,8 @@ public class QueryOperationCatalogViewTable extends ExpandingPreparingTable { @Override public RelNode convertToRel(RelOptTable.ToRelContext context) { - FlinkRelBuilder relBuilder = new FlinkRelBuilder( - // Sets up the view expander. - Contexts.of(context, context.getCluster().getPlanner().getContext()), + FlinkRelBuilder relBuilder = FlinkRelBuilder.of( + context, context.getCluster(), this.getRelOptSchema()); diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala index 66b736b..47f1ab8 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala @@ -188,4 +188,13 @@ object FlinkRelBuilder { cluster, relOptSchema) } + + def of(contextVar: Object, cluster: RelOptCluster, relOptSchema: RelOptSchema) + : FlinkRelBuilder = { + val mergedContext = Contexts.of(contextVar, cluster.getPlanner.getContext) + new FlinkRelBuilder( + mergedContext, + cluster, + relOptSchema) + } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala index 2a05b74..80e5945 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala @@ -65,7 +65,7 @@ object FlinkRelOptUtil { */ def toString( rel: RelNode, - detailLevel: SqlExplainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES, + detailLevel: SqlExplainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES, withIdPrefix: Boolean = false, withRetractTraits: Boolean = false, withRowType: Boolean = false): String = { diff --git a/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunction.out b/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunction.out index 9269167..152f3a4 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunction.out +++ b/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunction.out @@ -1,14 +1,14 @@ LogicalIntersect(all=[false]), rowType=[RecordType(INTEGER random)] LogicalIntersect(all=[false]), rowType=[RecordType(INTEGER random)] -LogicalProject(random=[$0]), rowType=[RecordType(INTEGER random)] +LogicalProject(inputs=[0]), rowType=[RecordType(INTEGER random)] LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)] -LogicalProject(random=[$1], EXPR$1=[RAND()]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)] +LogicalProject(exprs=[[$1, RAND()]]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)] LogicalTableScan(table=[[default_catalog, default_database, MyTable]]), rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, VARCHAR(2147483647) last)] -LogicalProject(random=[$0]), rowType=[RecordType(INTEGER random)] +LogicalProject(inputs=[0]), rowType=[RecordType(INTEGER random)] LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)] -LogicalProject(random=[$1], EXPR$1=[RAND()]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)] +LogicalProject(exprs=[[$1, RAND()]]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)] LogicalTableScan(table=[[default_catalog, default_database, MyTable]]), rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, VARCHAR(2147483647) last)] -LogicalProject(random=[$0]), rowType=[RecordType(INTEGER random)] +LogicalProject(inputs=[0]), rowType=[RecordType(INTEGER random)] LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)] -LogicalProject(random=[$1], EXPR$1=[RAND()]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)] +LogicalProject(exprs=[[$1, RAND()]]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)] LogicalTableScan(table=[[default_catalog, default_database, MyTable]]), rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, VARCHAR(2147483647) last)] diff --git a/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunctionView.out b/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunctionView.out index 9269167..152f3a4 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunctionView.out +++ b/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunctionView.out @@ -1,14 +1,14 @@ LogicalIntersect(all=[false]), rowType=[RecordType(INTEGER random)] LogicalIntersect(all=[false]), rowType=[RecordType(INTEGER random)] -LogicalProject(random=[$0]), rowType=[RecordType(INTEGER random)] +LogicalProject(inputs=[0]), rowType=[RecordType(INTEGER random)] LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)] -LogicalProject(random=[$1], EXPR$1=[RAND()]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)] +LogicalProject(exprs=[[$1, RAND()]]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)] LogicalTableScan(table=[[default_catalog, default_database, MyTable]]), rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, VARCHAR(2147483647) last)] -LogicalProject(random=[$0]), rowType=[RecordType(INTEGER random)] +LogicalProject(inputs=[0]), rowType=[RecordType(INTEGER random)] LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)] -LogicalProject(random=[$1], EXPR$1=[RAND()]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)] +LogicalProject(exprs=[[$1, RAND()]]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)] LogicalTableScan(table=[[default_catalog, default_database, MyTable]]), rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, VARCHAR(2147483647) last)] -LogicalProject(random=[$0]), rowType=[RecordType(INTEGER random)] +LogicalProject(inputs=[0]), rowType=[RecordType(INTEGER random)] LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)] -LogicalProject(random=[$1], EXPR$1=[RAND()]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)] +LogicalProject(exprs=[[$1, RAND()]]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)] LogicalTableScan(table=[[default_catalog, default_database, MyTable]]), rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, VARCHAR(2147483647) last)] diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RankTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RankTest.xml index 1d41877..05bdc9e 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RankTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RankTest.xml @@ -34,8 +34,8 @@ LogicalProject(a=[$0], b=[$1], rk=[$2]) </Resource> <Resource name="planAfter"> <![CDATA[ -Calc(select=[a, b, w0$o0]) -+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[b], orderBy=[a ASC], global=[true], select=[a, b, rk, w0$o0]) +Calc(select=[a, b, $2]) ++- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[b], orderBy=[a ASC], global=[true], select=[a, b, rk, $2]) +- Sort(orderBy=[b ASC, a ASC]) +- Exchange(distribution=[hash[b]]) +- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[b], orderBy=[a ASC], global=[false], select=[a, b, rk]) @@ -148,8 +148,8 @@ LogicalProject(a=[$0], b=[$1], rk=[$2]) </Resource> <Resource name="planAfter"> <![CDATA[ -Calc(select=[a, b, w0$o0]) -+- Rank(rankType=[RANK], rankRange=[rankStart=-1, rankEnd=2], partitionBy=[b, c], orderBy=[a ASC], global=[true], select=[a, b, c, w0$o0]) +Calc(select=[a, b, $2]) ++- Rank(rankType=[RANK], rankRange=[rankStart=-1, rankEnd=2], partitionBy=[b, c], orderBy=[a ASC], global=[true], select=[a, b, c, $2]) +- Sort(orderBy=[b ASC, c ASC, a ASC]) +- Exchange(distribution=[hash[b, c]]) +- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=2], partitionBy=[b, c], orderBy=[a ASC], global=[false], select=[a, b, c]) @@ -176,8 +176,8 @@ LogicalProject(a=[$0], b=[$1], rk=[$2]) </Resource> <Resource name="planAfter"> <![CDATA[ -Calc(select=[a, b, w0$o0], where=[>(a, 10)]) -+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=2], partitionBy=[b], orderBy=[a ASC], global=[true], select=[a, b, c, w0$o0]) +Calc(select=[a, b, $2], where=[>(a, 10)]) ++- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=2], partitionBy=[b], orderBy=[a ASC], global=[true], select=[a, b, c, $2]) +- Sort(orderBy=[b ASC, a ASC]) +- Exchange(distribution=[hash[b]]) +- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=2], partitionBy=[b], orderBy=[a ASC], global=[false], select=[a, b, c]) @@ -204,8 +204,8 @@ LogicalProject(a=[$0], b=[$1], rk=[$2]) </Resource> <Resource name="planAfter"> <![CDATA[ -Calc(select=[a, b, w0$o0]) -+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[], orderBy=[a ASC], global=[true], select=[a, b, c, w0$o0]) +Calc(select=[a, b, $2]) ++- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[], orderBy=[a ASC], global=[true], select=[a, b, c, $2]) +- Sort(orderBy=[a ASC]) +- Exchange(distribution=[single]) +- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[], orderBy=[a ASC], global=[false], select=[a, b, c]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalRankRuleForConstantRangeTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalRankRuleForConstantRangeTest.xml index b609e51..aff3213 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalRankRuleForConstantRangeTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalRankRuleForConstantRangeTest.xml @@ -34,8 +34,8 @@ LogicalProject(a=[$0], b=[$1], rk=[$2]) </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalCalc(select=[a, b, w0$o0]) -+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[b], orderBy=[a ASC], select=[a, b, rk, w0$o0]) +FlinkLogicalCalc(select=[a, b, $2]) ++- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[b], orderBy=[a ASC], select=[a, b, rk, $2]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, rk)]]], fields=[a, b, rk]) ]]> </Resource> @@ -60,7 +60,7 @@ LogicalProject(a=[$0], b=[$1], rk=[$2], rn=[$3]) </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalCalc(select=[a, b, w0$o0, w1$o0], where=[<(w0$o0, 10)]) +FlinkLogicalCalc(select=[a, b, w0$o0 AS $2, w1$o0 AS $3], where=[<(w0$o0, 10)]) +- FlinkLogicalOverAggregate(window#0=[window(partition {1} order by [0 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [RANK()])], window#1=[window(partition {1} order by [0 ASC-nulls-first] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -86,7 +86,7 @@ LogicalProject(a=[$0], b=[$1], rk1=[$2], rk2=[$3]) </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalCalc(select=[a, b, w0$o0, w1$o0], where=[<(w0$o0, 10)]) +FlinkLogicalCalc(select=[a, b, w0$o0 AS $2, w1$o0 AS $3], where=[<(w0$o0, 10)]) +- FlinkLogicalOverAggregate(window#0=[window(partition {1} order by [0 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [RANK()])], window#1=[window(partition {2} order by [0 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [RANK()])]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -160,7 +160,7 @@ LogicalProject(a=[$0], b=[$1], rk=[$2]) </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalCalc(select=[a, b, w0$o0], where=[>(w0$o0, 2)]) +FlinkLogicalCalc(select=[a, b, w0$o0 AS $2], where=[>(w0$o0, 2)]) +- FlinkLogicalOverAggregate(window#0=[window(partition {1} order by [0 ASC-nulls-first, 2 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [RANK()])]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -184,8 +184,8 @@ LogicalProject(a=[$0], b=[$1], rk=[$2]) </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalCalc(select=[a, b, w0$o0]) -+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=-1, rankEnd=2], partitionBy=[b,c], orderBy=[a ASC], select=[a, b, c, w0$o0]) +FlinkLogicalCalc(select=[a, b, $2]) ++- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=-1, rankEnd=2], partitionBy=[b,c], orderBy=[a ASC], select=[a, b, c, $2]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -208,8 +208,8 @@ LogicalProject(a=[$0], b=[$1], rk=[$2]) </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalCalc(select=[a, b, w0$o0], where=[>(a, 10)]) -+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=2], partitionBy=[b], orderBy=[a ASC], select=[a, b, c, w0$o0]) +FlinkLogicalCalc(select=[a, b, $2], where=[>(a, 10)]) ++- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=2], partitionBy=[b], orderBy=[a ASC], select=[a, b, c, $2]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -232,7 +232,7 @@ LogicalProject(a=[$0], b=[$1], rn=[$2]) </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalCalc(select=[a, b, w0$o0], where=[<=(w0$o0, 2)]) +FlinkLogicalCalc(select=[a, b, w0$o0 AS $2], where=[<=(w0$o0, 2)]) +- FlinkLogicalOverAggregate(window#0=[window(partition {1} order by [0 ASC-nulls-first] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -256,7 +256,7 @@ LogicalProject(a=[$0], b=[$1], rk=[$2]) </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalCalc(select=[a, b, w0$o0], where=[<(w0$o0, a)]) +FlinkLogicalCalc(select=[a, b, w0$o0 AS $2], where=[<(w0$o0, a)]) +- FlinkLogicalOverAggregate(window#0=[window(partition {1} order by [2 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [RANK()])]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -280,7 +280,7 @@ LogicalProject(a=[$0], b=[$1], rk=[$2]) </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalCalc(select=[a, b, w0$o0], where=[>(w0$o0, a)]) +FlinkLogicalCalc(select=[a, b, w0$o0 AS $2], where=[>(w0$o0, a)]) +- FlinkLogicalOverAggregate(window#0=[window(partition {1} order by [2 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [RANK()])]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -304,7 +304,7 @@ LogicalProject(a=[$0], b=[$1], rk=[$2]) </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalCalc(select=[a, b, w0$o0], where=[AND(<(w0$o0, a), >(CAST(b), 5:BIGINT))]) +FlinkLogicalCalc(select=[a, b, w0$o0 AS $2], where=[AND(<(w0$o0, a), >(CAST(b), 5:BIGINT))]) +- FlinkLogicalOverAggregate(window#0=[window(partition {1} order by [2 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [RANK()])]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -328,7 +328,7 @@ LogicalProject(a=[$0], b=[$1], rk=[$2]) </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalCalc(select=[a, b, w0$o0], where=[=(w0$o0, b)]) +FlinkLogicalCalc(select=[a, b, w0$o0 AS $2], where=[=(w0$o0, b)]) +- FlinkLogicalOverAggregate(window#0=[window(partition {0} order by [2 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [RANK()])]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -352,8 +352,8 @@ LogicalProject(a=[$0], b=[$1], rk=[$2]) </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalCalc(select=[a, b, w0$o0]) -+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[], orderBy=[a ASC], select=[a, b, c, w0$o0]) +FlinkLogicalCalc(select=[a, b, $2]) ++- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[], orderBy=[a ASC], select=[a, b, c, $2]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalRankRuleForRangeEndTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalRankRuleForRangeEndTest.xml index 1ec496f..5993374 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalRankRuleForRangeEndTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalRankRuleForRangeEndTest.xml @@ -113,7 +113,7 @@ LogicalProject(a=[$0], b=[$1], rk=[$2]) </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalCalc(select=[a, b, 2:BIGINT AS rk]) +FlinkLogicalCalc(select=[a, b, 2:BIGINT AS $2]) +- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=2, rankEnd=2], partitionBy=[b], orderBy=[a ASC, c ASC], select=[a, b, c]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml index 6db3a25..a7a80d3 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml @@ -36,7 +36,7 @@ LogicalProject(a=[$0], rank_num=[$1]) </Resource> <Resource name="planAfter"> <![CDATA[ -Calc(select=[a, 2:BIGINT AS rank_num]) +Calc(select=[a, 2:BIGINT AS $1]) +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=2, rankEnd=2], partitionBy=[b], orderBy=[proctime DESC], select=[a, b, proctime]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[a, b, proctime]) @@ -64,7 +64,7 @@ LogicalProject(a=[$0], rank_num=[$1]) </Resource> <Resource name="planAfter"> <![CDATA[ -Calc(select=[a, 3:BIGINT AS rank_num]) +Calc(select=[a, 3:BIGINT AS $1]) +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=3, rankEnd=3], partitionBy=[b], orderBy=[rowtime DESC], select=[a, b, rowtime]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[a, b, rowtime]) @@ -102,117 +102,117 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 1:BIG ]]> </Resource> </TestCase> - <TestCase name="testSimpleFirstRowOnProctime"> + <TestCase name="testSimpleLastRowOnBuiltinProctime"> <Resource name="sql"> <![CDATA[ -SELECT a, b, c +SELECT * FROM ( SELECT *, - ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) as rank_num - FROM MyTable) -WHERE rank_num = 1 + ROW_NUMBER() OVER (ORDER BY PROCTIME() DESC) as rowNum + FROM MyTable +) +WHERE rowNum = 1 ]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalProject(a=[$0], b=[$1], c=[$2]) +LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rowNum=[$5]) +- LogicalFilter(condition=[=($5, 1)]) - +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rowNum=[ROW_NUMBER() OVER (ORDER BY PROCTIME() DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -Calc(select=[a, b, c]) -+- Deduplicate(keep=[FirstRow], key=[a], order=[PROCTIME]) - +- Exchange(distribution=[hash[a]]) - +- Calc(select=[a, b, c, proctime]) +Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 1:BIGINT AS rowNum]) ++- Deduplicate(keep=[LastRow], key=[], order=[PROCTIME]) + +- Exchange(distribution=[single]) + +- Calc(select=[a, b, c, proctime, rowtime, PROCTIME() AS $5]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) ]]> </Resource> </TestCase> - <TestCase name="testSimpleFirstRowOnRowtime"> + <TestCase name="testSimpleFirstRowOnProctime"> <Resource name="sql"> <![CDATA[ SELECT a, b, c FROM ( SELECT *, - ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime ASC) as rank_num + ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) as rank_num FROM MyTable) -WHERE rank_num <= 1 +WHERE rank_num = 1 ]]> </Resource> <Resource name="planBefore"> <![CDATA[ LogicalProject(a=[$0], b=[$1], c=[$2]) -+- LogicalFilter(condition=[<=($5, 1)]) - +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $4 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]) ++- LogicalFilter(condition=[=($5, 1)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ Calc(select=[a, b, c]) -+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime ASC], select=[a, b, c, rowtime]) ++- Deduplicate(keep=[FirstRow], key=[a], order=[PROCTIME]) +- Exchange(distribution=[hash[a]]) - +- Calc(select=[a, b, c, rowtime]) + +- Calc(select=[a, b, c, proctime]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) ]]> </Resource> </TestCase> - <TestCase name="testSimpleLastRowOnBuiltinProctime"> + <TestCase name="testSimpleLastRowOnRowtime"> <Resource name="sql"> <![CDATA[ -SELECT * +SELECT a, b, c FROM ( SELECT *, - ROW_NUMBER() OVER (ORDER BY PROCTIME() DESC) as rowNum - FROM MyTable -) -WHERE rowNum = 1 + ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num + FROM MyTable) +WHERE rank_num = 1 ]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rowNum=[$5]) +LogicalProject(a=[$0], b=[$1], c=[$2]) +- LogicalFilter(condition=[=($5, 1)]) - +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rowNum=[ROW_NUMBER() OVER (ORDER BY PROCTIME() DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $4 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 1:BIGINT AS rowNum]) -+- Deduplicate(keep=[LastRow], key=[], order=[PROCTIME]) - +- Exchange(distribution=[single]) - +- Calc(select=[a, b, c, proctime, rowtime, PROCTIME() AS $5]) +Calc(select=[a, b, c]) ++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, b, c, rowtime]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, b, c, rowtime]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) ]]> </Resource> </TestCase> - <TestCase name="testSimpleLastRowOnRowtime"> + <TestCase name="testSimpleFirstRowOnRowtime"> <Resource name="sql"> <![CDATA[ SELECT a, b, c FROM ( SELECT *, - ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num + ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime ASC) as rank_num FROM MyTable) -WHERE rank_num = 1 +WHERE rank_num <= 1 ]]> </Resource> <Resource name="planBefore"> <![CDATA[ LogicalProject(a=[$0], b=[$1], c=[$2]) -+- LogicalFilter(condition=[=($5, 1)]) - +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $4 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]) ++- LogicalFilter(condition=[<=($5, 1)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $4 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ Calc(select=[a, b, c]) -+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, b, c, rowtime]) ++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime ASC], select=[a, b, c, rowtime]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, b, c, rowtime]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml index c9ea8c8..e16ab3b 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml @@ -56,14 +56,14 @@ LogicalProject(a=[$0], b=[$1], count_c=[$2], rank_num=[$3]) <Resource name="planAfter"> <![CDATA[ Calc(select=[a, b, count_c, w0$o0], updateAsRetraction=[false], accMode=[Acc]) -+- Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[], orderBy=[count_c DESC], select=[a, b, count_c, $3, w0$o0], updateAsRetraction=[false], accMode=[Acc]) ++- Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[], orderBy=[count_c DESC], select=[a, b, count_c, w0$o0, w0$o0], updateAsRetraction=[false], accMode=[Acc]) +- Exchange(distribution=[single], updateAsRetraction=[false], accMode=[Acc]) - +- Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], select=[a, b, count_c, $3], updateAsRetraction=[false], accMode=[Acc]) + +- Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], select=[a, b, count_c, w0$o0], updateAsRetraction=[false], accMode=[Acc]) +- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc]) +- Calc(select=[a, b, count_c], updateAsRetraction=[false], accMode=[Acc]) +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(*) AS count_c], updateAsRetraction=[false], accMode=[Acc]) +- Exchange(distribution=[hash[a, b]], updateAsRetraction=[true], accMode=[Acc]) - +- Calc(select=[a, b, _UTF-16LE'cn' AS cn], updateAsRetraction=[true], accMode=[Acc]) + +- Calc(select=[a, b], updateAsRetraction=[true], accMode=[Acc]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[true], accMode=[Acc]) ]]> </Resource> @@ -670,7 +670,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], row_num=[$3]) </Resource> <Resource name="planAfter"> <![CDATA[ -Calc(select=[a, b, c, 10:BIGINT AS row_num], updateAsRetraction=[false], accMode=[Acc]) +Calc(select=[a, b, c, 10:BIGINT AS $3], updateAsRetraction=[false], accMode=[Acc]) +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=10, rankEnd=10], partitionBy=[a], orderBy=[b DESC], select=[a, b, c], updateAsRetraction=[false], accMode=[Acc]) +- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc]) +- Calc(select=[a, b, c], updateAsRetraction=[false], accMode=[Acc]) @@ -780,7 +780,7 @@ Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankS +- Calc(select=[a, b, count_c], updateAsRetraction=[false], accMode=[Acc]) +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(*) AS count_c], updateAsRetraction=[false], accMode=[Acc]) +- Exchange(distribution=[hash[a, b]], updateAsRetraction=[true], accMode=[Acc]) - +- Calc(select=[a, b, _UTF-16LE'cn' AS cn], updateAsRetraction=[true], accMode=[Acc]) + +- Calc(select=[a, b], updateAsRetraction=[true], accMode=[Acc]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[true], accMode=[Acc]) ]]> </Resource> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index a93b5a2..c9d232c 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -261,7 +261,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) def verifyPlan(sql: String): Unit = { doVerifyPlan( sql, - SqlExplainLevel.DIGEST_ATTRIBUTES, + SqlExplainLevel.EXPPLAN_ATTRIBUTES, withRowType = false, printPlanBefore = true) } @@ -269,7 +269,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) def verifyPlan(table: Table): Unit = { doVerifyPlan( table, - SqlExplainLevel.DIGEST_ATTRIBUTES, + SqlExplainLevel.EXPPLAN_ATTRIBUTES, withRowType = false, printPlanBefore = true) } @@ -277,7 +277,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) def verifyPlanWithType(sql: String): Unit = { doVerifyPlan( sql, - explainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES, + explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES, withRowType = true, printPlanBefore = true) } @@ -285,7 +285,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) def verifyPlanWithType(table: Table): Unit = { doVerifyPlan( table, - explainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES, + explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES, withRowType = true, printPlanBefore = true) } @@ -299,7 +299,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) val relNode = TableTestUtil.toRelNode(table) val optimizedPlan = getOptimizedPlan( Array(relNode), - explainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES, + explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES, withRetractTraits = false, withRowType = false) val result = notExpected.forall(!optimizedPlan.contains(_)) @@ -357,7 +357,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) val planBefore = SystemUtils.LINE_SEPARATOR + FlinkRelOptUtil.toString( relNode, - SqlExplainLevel.DIGEST_ATTRIBUTES, + SqlExplainLevel.EXPPLAN_ATTRIBUTES, withRowType = withRowType) assertEqualsOrExpand("planBefore", planBefore) } @@ -371,7 +371,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) val table = getTableEnv.sqlQuery(sql) doVerifyPlan( table, - explainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES, + explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES, withRowType = false, withRetractTraits = false, printResource = true, @@ -410,7 +410,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) val planBefore = SystemUtils.LINE_SEPARATOR + FlinkRelOptUtil.toString( relNode, - SqlExplainLevel.DIGEST_ATTRIBUTES, + SqlExplainLevel.EXPPLAN_ATTRIBUTES, withRowType = withRowType) assertEqualsOrExpand("planBefore", planBefore) } @@ -595,7 +595,7 @@ abstract class TableTestUtil( def verifySqlUpdate(sql: String): Unit = { doVerifySqlUpdate( sql, - SqlExplainLevel.DIGEST_ATTRIBUTES, + SqlExplainLevel.EXPPLAN_ATTRIBUTES, withRowType = false, withRetractTraits = false, printPlanBefore = true) @@ -603,7 +603,7 @@ abstract class TableTestUtil( def verifyPlan(): Unit = { doVerifyPlan( - SqlExplainLevel.DIGEST_ATTRIBUTES, + SqlExplainLevel.EXPPLAN_ATTRIBUTES, withRowType = false, withRetractTraits = false, printPlanBefore = true) @@ -644,7 +644,7 @@ abstract class TableTestUtil( val planBefore = new StringBuilder relNodes.foreach { sink => planBefore.append(System.lineSeparator) - planBefore.append(FlinkRelOptUtil.toString(sink, SqlExplainLevel.DIGEST_ATTRIBUTES)) + planBefore.append(FlinkRelOptUtil.toString(sink, SqlExplainLevel.EXPPLAN_ATTRIBUTES)) } assertEqualsOrExpand("planBefore", planBefore.toString()) } @@ -770,7 +770,7 @@ case class StreamTableTestUtil( def verifyPlanWithTrait(): Unit = { doVerifyPlan( - SqlExplainLevel.DIGEST_ATTRIBUTES, + SqlExplainLevel.EXPPLAN_ATTRIBUTES, withRetractTraits = true, withRowType = false, printPlanBefore = true) @@ -779,7 +779,7 @@ case class StreamTableTestUtil( def verifyPlanWithTrait(sql: String): Unit = { doVerifyPlan( sql, - SqlExplainLevel.DIGEST_ATTRIBUTES, + SqlExplainLevel.EXPPLAN_ATTRIBUTES, withRetractTraits = true, withRowType = false, printPlanBefore = true) @@ -788,7 +788,7 @@ case class StreamTableTestUtil( def verifyPlanWithTrait(table: Table): Unit = { doVerifyPlan( table, - SqlExplainLevel.DIGEST_ATTRIBUTES, + SqlExplainLevel.EXPPLAN_ATTRIBUTES, withRetractTraits = true, withRowType = false, printPlanBefore = true)
