This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 06489d1982e83e4890f69cfa3b492def38bb3dfd
Author: yuzhao.cyz <[email protected]>
AuthorDate: Tue Mar 17 21:11:38 2020 +0800

    [FLINK-14338][table-planner-blink] Plan verify changes from DIGEST to 
EXPLAIN
    
    * Because of CALCITE-3713, the project names was removed from plan DIGEST, 
thus, the DIGEST plan has less info that EXPLAIN, we switch to EXPLAIN for plan 
verification;
    * The CALC window fields name was also changes with CALCITE-3713, which is 
acceptable because the old name was also un-readable, the new name was based on 
field index, not bad.
---
 .../catalog/QueryOperationCatalogViewTable.java    |  6 +-
 .../table/planner/calcite/FlinkRelBuilder.scala    |  9 +++
 .../table/planner/plan/utils/FlinkRelOptUtil.scala |  2 +-
 .../digest/testGetDigestWithDynamicFunction.out    | 12 ++--
 .../testGetDigestWithDynamicFunctionView.out       | 12 ++--
 .../table/planner/plan/batch/sql/RankTest.xml      | 16 ++---
 .../FlinkLogicalRankRuleForConstantRangeTest.xml   | 32 ++++-----
 .../FlinkLogicalRankRuleForRangeEndTest.xml        |  2 +-
 .../planner/plan/stream/sql/DeduplicateTest.xml    | 76 +++++++++++-----------
 .../table/planner/plan/stream/sql/RankTest.xml     | 10 +--
 .../flink/table/planner/utils/TableTestBase.scala  | 28 ++++----
 11 files changed, 106 insertions(+), 99 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
index f184be0..4c8b07b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable;
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
 
-import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelOptSchema;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelNode;
@@ -69,9 +68,8 @@ public class QueryOperationCatalogViewTable extends 
ExpandingPreparingTable {
 
        @Override
        public RelNode convertToRel(RelOptTable.ToRelContext context) {
-               FlinkRelBuilder relBuilder = new FlinkRelBuilder(
-                               // Sets up the view expander.
-                               Contexts.of(context, 
context.getCluster().getPlanner().getContext()),
+               FlinkRelBuilder relBuilder = FlinkRelBuilder.of(
+                               context,
                                context.getCluster(),
                                this.getRelOptSchema());
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala
index 66b736b..47f1ab8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala
@@ -188,4 +188,13 @@ object FlinkRelBuilder {
       cluster,
       relOptSchema)
   }
+
+  def of(contextVar: Object, cluster: RelOptCluster, relOptSchema: 
RelOptSchema)
+    : FlinkRelBuilder = {
+    val mergedContext = Contexts.of(contextVar, cluster.getPlanner.getContext)
+    new FlinkRelBuilder(
+      mergedContext,
+      cluster,
+      relOptSchema)
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
index 2a05b74..80e5945 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
@@ -65,7 +65,7 @@ object FlinkRelOptUtil {
     */
   def toString(
       rel: RelNode,
-      detailLevel: SqlExplainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES,
+      detailLevel: SqlExplainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
       withIdPrefix: Boolean = false,
       withRetractTraits: Boolean = false,
       withRowType: Boolean = false): String = {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunction.out
 
b/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunction.out
index 9269167..152f3a4 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunction.out
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunction.out
@@ -1,14 +1,14 @@
 LogicalIntersect(all=[false]), rowType=[RecordType(INTEGER random)]
 LogicalIntersect(all=[false]), rowType=[RecordType(INTEGER random)]
-LogicalProject(random=[$0]), rowType=[RecordType(INTEGER random)]
+LogicalProject(inputs=[0]), rowType=[RecordType(INTEGER random)]
 LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1]), 
rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)]
-LogicalProject(random=[$1], EXPR$1=[RAND()]), rowType=[RecordType(INTEGER 
random, DOUBLE EXPR$1)]
+LogicalProject(exprs=[[$1, RAND()]]), rowType=[RecordType(INTEGER random, 
DOUBLE EXPR$1)]
 LogicalTableScan(table=[[default_catalog, default_database, MyTable]]), 
rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, 
VARCHAR(2147483647) last)]
-LogicalProject(random=[$0]), rowType=[RecordType(INTEGER random)]
+LogicalProject(inputs=[0]), rowType=[RecordType(INTEGER random)]
 LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1]), 
rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)]
-LogicalProject(random=[$1], EXPR$1=[RAND()]), rowType=[RecordType(INTEGER 
random, DOUBLE EXPR$1)]
+LogicalProject(exprs=[[$1, RAND()]]), rowType=[RecordType(INTEGER random, 
DOUBLE EXPR$1)]
 LogicalTableScan(table=[[default_catalog, default_database, MyTable]]), 
rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, 
VARCHAR(2147483647) last)]
-LogicalProject(random=[$0]), rowType=[RecordType(INTEGER random)]
+LogicalProject(inputs=[0]), rowType=[RecordType(INTEGER random)]
 LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1]), 
rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)]
-LogicalProject(random=[$1], EXPR$1=[RAND()]), rowType=[RecordType(INTEGER 
random, DOUBLE EXPR$1)]
+LogicalProject(exprs=[[$1, RAND()]]), rowType=[RecordType(INTEGER random, 
DOUBLE EXPR$1)]
 LogicalTableScan(table=[[default_catalog, default_database, MyTable]]), 
rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, 
VARCHAR(2147483647) last)]
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunctionView.out
 
b/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunctionView.out
index 9269167..152f3a4 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunctionView.out
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/digest/testGetDigestWithDynamicFunctionView.out
@@ -1,14 +1,14 @@
 LogicalIntersect(all=[false]), rowType=[RecordType(INTEGER random)]
 LogicalIntersect(all=[false]), rowType=[RecordType(INTEGER random)]
-LogicalProject(random=[$0]), rowType=[RecordType(INTEGER random)]
+LogicalProject(inputs=[0]), rowType=[RecordType(INTEGER random)]
 LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1]), 
rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)]
-LogicalProject(random=[$1], EXPR$1=[RAND()]), rowType=[RecordType(INTEGER 
random, DOUBLE EXPR$1)]
+LogicalProject(exprs=[[$1, RAND()]]), rowType=[RecordType(INTEGER random, 
DOUBLE EXPR$1)]
 LogicalTableScan(table=[[default_catalog, default_database, MyTable]]), 
rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, 
VARCHAR(2147483647) last)]
-LogicalProject(random=[$0]), rowType=[RecordType(INTEGER random)]
+LogicalProject(inputs=[0]), rowType=[RecordType(INTEGER random)]
 LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1]), 
rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)]
-LogicalProject(random=[$1], EXPR$1=[RAND()]), rowType=[RecordType(INTEGER 
random, DOUBLE EXPR$1)]
+LogicalProject(exprs=[[$1, RAND()]]), rowType=[RecordType(INTEGER random, 
DOUBLE EXPR$1)]
 LogicalTableScan(table=[[default_catalog, default_database, MyTable]]), 
rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, 
VARCHAR(2147483647) last)]
-LogicalProject(random=[$0]), rowType=[RecordType(INTEGER random)]
+LogicalProject(inputs=[0]), rowType=[RecordType(INTEGER random)]
 LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1]), 
rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)]
-LogicalProject(random=[$1], EXPR$1=[RAND()]), rowType=[RecordType(INTEGER 
random, DOUBLE EXPR$1)]
+LogicalProject(exprs=[[$1, RAND()]]), rowType=[RecordType(INTEGER random, 
DOUBLE EXPR$1)]
 LogicalTableScan(table=[[default_catalog, default_database, MyTable]]), 
rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, 
VARCHAR(2147483647) last)]
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RankTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RankTest.xml
index 1d41877..05bdc9e 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RankTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RankTest.xml
@@ -34,8 +34,8 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, w0$o0])
-+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[b], 
orderBy=[a ASC], global=[true], select=[a, b, rk, w0$o0])
+Calc(select=[a, b, $2])
++- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[b], 
orderBy=[a ASC], global=[true], select=[a, b, rk, $2])
    +- Sort(orderBy=[b ASC, a ASC])
       +- Exchange(distribution=[hash[b]])
          +- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], 
partitionBy=[b], orderBy=[a ASC], global=[false], select=[a, b, rk])
@@ -148,8 +148,8 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, w0$o0])
-+- Rank(rankType=[RANK], rankRange=[rankStart=-1, rankEnd=2], partitionBy=[b, 
c], orderBy=[a ASC], global=[true], select=[a, b, c, w0$o0])
+Calc(select=[a, b, $2])
++- Rank(rankType=[RANK], rankRange=[rankStart=-1, rankEnd=2], partitionBy=[b, 
c], orderBy=[a ASC], global=[true], select=[a, b, c, $2])
    +- Sort(orderBy=[b ASC, c ASC, a ASC])
       +- Exchange(distribution=[hash[b, c]])
          +- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=2], 
partitionBy=[b, c], orderBy=[a ASC], global=[false], select=[a, b, c])
@@ -176,8 +176,8 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, w0$o0], where=[>(a, 10)])
-+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=2], partitionBy=[b], 
orderBy=[a ASC], global=[true], select=[a, b, c, w0$o0])
+Calc(select=[a, b, $2], where=[>(a, 10)])
++- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=2], partitionBy=[b], 
orderBy=[a ASC], global=[true], select=[a, b, c, $2])
    +- Sort(orderBy=[b ASC, a ASC])
       +- Exchange(distribution=[hash[b]])
          +- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=2], 
partitionBy=[b], orderBy=[a ASC], global=[false], select=[a, b, c])
@@ -204,8 +204,8 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, w0$o0])
-+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[], 
orderBy=[a ASC], global=[true], select=[a, b, c, w0$o0])
+Calc(select=[a, b, $2])
++- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], partitionBy=[], 
orderBy=[a ASC], global=[true], select=[a, b, c, $2])
    +- Sort(orderBy=[a ASC])
       +- Exchange(distribution=[single])
          +- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], 
partitionBy=[], orderBy=[a ASC], global=[false], select=[a, b, c])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalRankRuleForConstantRangeTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalRankRuleForConstantRangeTest.xml
index b609e51..aff3213 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalRankRuleForConstantRangeTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalRankRuleForConstantRangeTest.xml
@@ -34,8 +34,8 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, w0$o0])
-+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], 
partitionBy=[b], orderBy=[a ASC], select=[a, b, rk, w0$o0])
+FlinkLogicalCalc(select=[a, b, $2])
++- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], 
partitionBy=[b], orderBy=[a ASC], select=[a, b, rk, $2])
    +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, rk)]]], fields=[a, b, rk])
 ]]>
     </Resource>
@@ -60,7 +60,7 @@ LogicalProject(a=[$0], b=[$1], rk=[$2], rn=[$3])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, w0$o0, w1$o0], where=[<(w0$o0, 10)])
+FlinkLogicalCalc(select=[a, b, w0$o0 AS $2, w1$o0 AS $3], where=[<(w0$o0, 10)])
 +- FlinkLogicalOverAggregate(window#0=[window(partition {1} order by [0 
ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs 
[RANK()])], window#1=[window(partition {1} order by [0 ASC-nulls-first] rows 
between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
    +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
@@ -86,7 +86,7 @@ LogicalProject(a=[$0], b=[$1], rk1=[$2], rk2=[$3])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, w0$o0, w1$o0], where=[<(w0$o0, 10)])
+FlinkLogicalCalc(select=[a, b, w0$o0 AS $2, w1$o0 AS $3], where=[<(w0$o0, 10)])
 +- FlinkLogicalOverAggregate(window#0=[window(partition {1} order by [0 
ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs 
[RANK()])], window#1=[window(partition {2} order by [0 ASC-nulls-first] range 
between UNBOUNDED PRECEDING and CURRENT ROW aggs [RANK()])])
    +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
@@ -160,7 +160,7 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, w0$o0], where=[>(w0$o0, 2)])
+FlinkLogicalCalc(select=[a, b, w0$o0 AS $2], where=[>(w0$o0, 2)])
 +- FlinkLogicalOverAggregate(window#0=[window(partition {1} order by [0 
ASC-nulls-first, 2 ASC-nulls-first] range between UNBOUNDED PRECEDING and 
CURRENT ROW aggs [RANK()])])
    +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
@@ -184,8 +184,8 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, w0$o0])
-+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=-1, rankEnd=2], 
partitionBy=[b,c], orderBy=[a ASC], select=[a, b, c, w0$o0])
+FlinkLogicalCalc(select=[a, b, $2])
++- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=-1, rankEnd=2], 
partitionBy=[b,c], orderBy=[a ASC], select=[a, b, c, $2])
    +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -208,8 +208,8 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, w0$o0], where=[>(a, 10)])
-+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=2], 
partitionBy=[b], orderBy=[a ASC], select=[a, b, c, w0$o0])
+FlinkLogicalCalc(select=[a, b, $2], where=[>(a, 10)])
++- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=2], 
partitionBy=[b], orderBy=[a ASC], select=[a, b, c, $2])
    +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -232,7 +232,7 @@ LogicalProject(a=[$0], b=[$1], rn=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, w0$o0], where=[<=(w0$o0, 2)])
+FlinkLogicalCalc(select=[a, b, w0$o0 AS $2], where=[<=(w0$o0, 2)])
 +- FlinkLogicalOverAggregate(window#0=[window(partition {1} order by [0 
ASC-nulls-first] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs 
[ROW_NUMBER()])])
    +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
@@ -256,7 +256,7 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, w0$o0], where=[<(w0$o0, a)])
+FlinkLogicalCalc(select=[a, b, w0$o0 AS $2], where=[<(w0$o0, a)])
 +- FlinkLogicalOverAggregate(window#0=[window(partition {1} order by [2 
ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs 
[RANK()])])
    +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
@@ -280,7 +280,7 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, w0$o0], where=[>(w0$o0, a)])
+FlinkLogicalCalc(select=[a, b, w0$o0 AS $2], where=[>(w0$o0, a)])
 +- FlinkLogicalOverAggregate(window#0=[window(partition {1} order by [2 
ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs 
[RANK()])])
    +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
@@ -304,7 +304,7 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, w0$o0], where=[AND(<(w0$o0, a), >(CAST(b), 
5:BIGINT))])
+FlinkLogicalCalc(select=[a, b, w0$o0 AS $2], where=[AND(<(w0$o0, a), 
>(CAST(b), 5:BIGINT))])
 +- FlinkLogicalOverAggregate(window#0=[window(partition {1} order by [2 
ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs 
[RANK()])])
    +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
@@ -328,7 +328,7 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, w0$o0], where=[=(w0$o0, b)])
+FlinkLogicalCalc(select=[a, b, w0$o0 AS $2], where=[=(w0$o0, b)])
 +- FlinkLogicalOverAggregate(window#0=[window(partition {0} order by [2 
ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs 
[RANK()])])
    +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
@@ -352,8 +352,8 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, w0$o0])
-+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], 
partitionBy=[], orderBy=[a ASC], select=[a, b, c, w0$o0])
+FlinkLogicalCalc(select=[a, b, $2])
++- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=9], 
partitionBy=[], orderBy=[a ASC], select=[a, b, c, $2])
    +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalRankRuleForRangeEndTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalRankRuleForRangeEndTest.xml
index 1ec496f..5993374 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalRankRuleForRangeEndTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalRankRuleForRangeEndTest.xml
@@ -113,7 +113,7 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, 2:BIGINT AS rk])
+FlinkLogicalCalc(select=[a, b, 2:BIGINT AS $2])
 +- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=2, rankEnd=2], 
partitionBy=[b], orderBy=[a ASC, c ASC], select=[a, b, c])
    +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
index 6db3a25..a7a80d3 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
@@ -36,7 +36,7 @@ LogicalProject(a=[$0], rank_num=[$1])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, 2:BIGINT AS rank_num])
+Calc(select=[a, 2:BIGINT AS $1])
 +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=2, rankEnd=2], partitionBy=[b], orderBy=[proctime DESC], 
select=[a, b, proctime])
    +- Exchange(distribution=[hash[b]])
       +- Calc(select=[a, b, proctime])
@@ -64,7 +64,7 @@ LogicalProject(a=[$0], rank_num=[$1])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, 3:BIGINT AS rank_num])
+Calc(select=[a, 3:BIGINT AS $1])
 +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=3, rankEnd=3], partitionBy=[b], orderBy=[rowtime DESC], 
select=[a, b, rowtime])
    +- Exchange(distribution=[hash[b]])
       +- Calc(select=[a, b, rowtime])
@@ -102,117 +102,117 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS 
proctime, rowtime, 1:BIG
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSimpleFirstRowOnProctime">
+  <TestCase name="testSimpleLastRowOnBuiltinProctime">
     <Resource name="sql">
       <![CDATA[
-SELECT a, b, c
+SELECT *
 FROM (
   SELECT *,
-      ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) as rank_num
-  FROM MyTable)
-WHERE rank_num = 1
+    ROW_NUMBER() OVER (ORDER BY PROCTIME() DESC) as rowNum
+  FROM MyTable
+)
+WHERE rowNum = 1
       ]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2])
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], 
rowNum=[$5])
 +- LogicalFilter(condition=[=($5, 1)])
-   +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], 
rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], 
rowNum=[ROW_NUMBER() OVER (ORDER BY PROCTIME() DESC NULLS LAST ROWS BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, c])
-+- Deduplicate(keep=[FirstRow], key=[a], order=[PROCTIME])
-   +- Exchange(distribution=[hash[a]])
-      +- Calc(select=[a, b, c, proctime])
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 
1:BIGINT AS rowNum])
++- Deduplicate(keep=[LastRow], key=[], order=[PROCTIME])
+   +- Exchange(distribution=[single])
+      +- Calc(select=[a, b, c, proctime, rowtime, PROCTIME() AS $5])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSimpleFirstRowOnRowtime">
+  <TestCase name="testSimpleFirstRowOnProctime">
     <Resource name="sql">
       <![CDATA[
 SELECT a, b, c
 FROM (
   SELECT *,
-      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime ASC) as rank_num
+      ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) as rank_num
   FROM MyTable)
-WHERE rank_num <= 1
+WHERE rank_num = 1
       ]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2])
-+- LogicalFilter(condition=[<=($5, 1)])
-   +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], 
rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $4 NULLS FIRST ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
++- LogicalFilter(condition=[=($5, 1)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], 
rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, b, c])
-+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime ASC], 
select=[a, b, c, rowtime])
++- Deduplicate(keep=[FirstRow], key=[a], order=[PROCTIME])
    +- Exchange(distribution=[hash[a]])
-      +- Calc(select=[a, b, c, rowtime])
+      +- Calc(select=[a, b, c, proctime])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSimpleLastRowOnBuiltinProctime">
+  <TestCase name="testSimpleLastRowOnRowtime">
     <Resource name="sql">
       <![CDATA[
-SELECT *
+SELECT a, b, c
 FROM (
   SELECT *,
-    ROW_NUMBER() OVER (ORDER BY PROCTIME() DESC) as rowNum
-  FROM MyTable
-)
-WHERE rowNum = 1
+      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+  FROM MyTable)
+WHERE rank_num = 1
       ]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], 
rowNum=[$5])
+LogicalProject(a=[$0], b=[$1], c=[$2])
 +- LogicalFilter(condition=[=($5, 1)])
-   +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], 
rowNum=[ROW_NUMBER() OVER (ORDER BY PROCTIME() DESC NULLS LAST ROWS BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], 
rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $4 DESC NULLS LAST ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 
1:BIGINT AS rowNum])
-+- Deduplicate(keep=[LastRow], key=[], order=[PROCTIME])
-   +- Exchange(distribution=[single])
-      +- Calc(select=[a, b, c, proctime, rowtime, PROCTIME() AS $5])
+Calc(select=[a, b, c])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], 
select=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, b, c, rowtime])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSimpleLastRowOnRowtime">
+  <TestCase name="testSimpleFirstRowOnRowtime">
     <Resource name="sql">
       <![CDATA[
 SELECT a, b, c
 FROM (
   SELECT *,
-      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rank_num
+      ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime ASC) as rank_num
   FROM MyTable)
-WHERE rank_num = 1
+WHERE rank_num <= 1
       ]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2])
-+- LogicalFilter(condition=[=($5, 1)])
-   +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], 
rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $4 DESC NULLS LAST ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
++- LogicalFilter(condition=[<=($5, 1)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], 
rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $4 NULLS FIRST ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, b, c])
-+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], 
select=[a, b, c, rowtime])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime ASC], 
select=[a, b, c, rowtime])
    +- Exchange(distribution=[hash[a]])
       +- Calc(select=[a, b, c, rowtime])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
index c9ea8c8..e16ab3b 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
@@ -56,14 +56,14 @@ LogicalProject(a=[$0], b=[$1], count_c=[$2], rank_num=[$3])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, b, count_c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
-+- Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=10], partitionBy=[], orderBy=[count_c DESC], 
select=[a, b, count_c, $3, w0$o0], updateAsRetraction=[false], accMode=[Acc])
++- Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=10], partitionBy=[], orderBy=[count_c DESC], 
select=[a, b, count_c, w0$o0, w0$o0], updateAsRetraction=[false], accMode=[Acc])
    +- Exchange(distribution=[single], updateAsRetraction=[false], 
accMode=[Acc])
-      +- Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], 
select=[a, b, count_c, $3], updateAsRetraction=[false], accMode=[Acc])
+      +- Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], 
select=[a, b, count_c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
          +- Exchange(distribution=[hash[a]], updateAsRetraction=[false], 
accMode=[Acc])
             +- Calc(select=[a, b, count_c], updateAsRetraction=[false], 
accMode=[Acc])
                +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(*) AS 
count_c], updateAsRetraction=[false], accMode=[Acc])
                   +- Exchange(distribution=[hash[a, b]], 
updateAsRetraction=[true], accMode=[Acc])
-                     +- Calc(select=[a, b, _UTF-16LE'cn' AS cn], 
updateAsRetraction=[true], accMode=[Acc])
+                     +- Calc(select=[a, b], updateAsRetraction=[true], 
accMode=[Acc])
                         +- DataStreamScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, proctime, rowtime], 
updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
@@ -670,7 +670,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], row_num=[$3])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, c, 10:BIGINT AS row_num], updateAsRetraction=[false], 
accMode=[Acc])
+Calc(select=[a, b, c, 10:BIGINT AS $3], updateAsRetraction=[false], 
accMode=[Acc])
 +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=10, rankEnd=10], partitionBy=[a], orderBy=[b DESC], 
select=[a, b, c], updateAsRetraction=[false], accMode=[Acc])
    +- Exchange(distribution=[hash[a]], updateAsRetraction=[false], 
accMode=[Acc])
       +- Calc(select=[a, b, c], updateAsRetraction=[false], accMode=[Acc])
@@ -780,7 +780,7 @@ Rank(strategy=[UpdateFastStrategy[0,1]], 
rankType=[ROW_NUMBER], rankRange=[rankS
    +- Calc(select=[a, b, count_c], updateAsRetraction=[false], accMode=[Acc])
       +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(*) AS count_c], 
updateAsRetraction=[false], accMode=[Acc])
          +- Exchange(distribution=[hash[a, b]], updateAsRetraction=[true], 
accMode=[Acc])
-            +- Calc(select=[a, b, _UTF-16LE'cn' AS cn], 
updateAsRetraction=[true], accMode=[Acc])
+            +- Calc(select=[a, b], updateAsRetraction=[true], accMode=[Acc])
                +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[true], 
accMode=[Acc])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index a93b5a2..c9d232c 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -261,7 +261,7 @@ abstract class TableTestUtilBase(test: TableTestBase, 
isStreamingMode: Boolean)
   def verifyPlan(sql: String): Unit = {
     doVerifyPlan(
       sql,
-      SqlExplainLevel.DIGEST_ATTRIBUTES,
+      SqlExplainLevel.EXPPLAN_ATTRIBUTES,
       withRowType = false,
       printPlanBefore = true)
   }
@@ -269,7 +269,7 @@ abstract class TableTestUtilBase(test: TableTestBase, 
isStreamingMode: Boolean)
   def verifyPlan(table: Table): Unit = {
     doVerifyPlan(
       table,
-      SqlExplainLevel.DIGEST_ATTRIBUTES,
+      SqlExplainLevel.EXPPLAN_ATTRIBUTES,
       withRowType = false,
       printPlanBefore = true)
   }
@@ -277,7 +277,7 @@ abstract class TableTestUtilBase(test: TableTestBase, 
isStreamingMode: Boolean)
   def verifyPlanWithType(sql: String): Unit = {
     doVerifyPlan(
       sql,
-      explainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES,
+      explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
       withRowType = true,
       printPlanBefore = true)
   }
@@ -285,7 +285,7 @@ abstract class TableTestUtilBase(test: TableTestBase, 
isStreamingMode: Boolean)
   def verifyPlanWithType(table: Table): Unit = {
     doVerifyPlan(
       table,
-      explainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES,
+      explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
       withRowType = true,
       printPlanBefore = true)
   }
@@ -299,7 +299,7 @@ abstract class TableTestUtilBase(test: TableTestBase, 
isStreamingMode: Boolean)
     val relNode = TableTestUtil.toRelNode(table)
     val optimizedPlan = getOptimizedPlan(
       Array(relNode),
-      explainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES,
+      explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
       withRetractTraits = false,
       withRowType = false)
     val result = notExpected.forall(!optimizedPlan.contains(_))
@@ -357,7 +357,7 @@ abstract class TableTestUtilBase(test: TableTestBase, 
isStreamingMode: Boolean)
       val planBefore = SystemUtils.LINE_SEPARATOR +
         FlinkRelOptUtil.toString(
           relNode,
-          SqlExplainLevel.DIGEST_ATTRIBUTES,
+          SqlExplainLevel.EXPPLAN_ATTRIBUTES,
           withRowType = withRowType)
       assertEqualsOrExpand("planBefore", planBefore)
     }
@@ -371,7 +371,7 @@ abstract class TableTestUtilBase(test: TableTestBase, 
isStreamingMode: Boolean)
     val table = getTableEnv.sqlQuery(sql)
     doVerifyPlan(
       table,
-      explainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES,
+      explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
       withRowType = false,
       withRetractTraits = false,
       printResource = true,
@@ -410,7 +410,7 @@ abstract class TableTestUtilBase(test: TableTestBase, 
isStreamingMode: Boolean)
       val planBefore = SystemUtils.LINE_SEPARATOR +
         FlinkRelOptUtil.toString(
           relNode,
-          SqlExplainLevel.DIGEST_ATTRIBUTES,
+          SqlExplainLevel.EXPPLAN_ATTRIBUTES,
           withRowType = withRowType)
       assertEqualsOrExpand("planBefore", planBefore)
     }
@@ -595,7 +595,7 @@ abstract class TableTestUtil(
   def verifySqlUpdate(sql: String): Unit = {
     doVerifySqlUpdate(
       sql,
-      SqlExplainLevel.DIGEST_ATTRIBUTES,
+      SqlExplainLevel.EXPPLAN_ATTRIBUTES,
       withRowType = false,
       withRetractTraits = false,
       printPlanBefore = true)
@@ -603,7 +603,7 @@ abstract class TableTestUtil(
 
   def verifyPlan(): Unit = {
     doVerifyPlan(
-      SqlExplainLevel.DIGEST_ATTRIBUTES,
+      SqlExplainLevel.EXPPLAN_ATTRIBUTES,
       withRowType = false,
       withRetractTraits = false,
       printPlanBefore = true)
@@ -644,7 +644,7 @@ abstract class TableTestUtil(
       val planBefore = new StringBuilder
       relNodes.foreach { sink =>
         planBefore.append(System.lineSeparator)
-        planBefore.append(FlinkRelOptUtil.toString(sink, 
SqlExplainLevel.DIGEST_ATTRIBUTES))
+        planBefore.append(FlinkRelOptUtil.toString(sink, 
SqlExplainLevel.EXPPLAN_ATTRIBUTES))
       }
       assertEqualsOrExpand("planBefore", planBefore.toString())
     }
@@ -770,7 +770,7 @@ case class StreamTableTestUtil(
 
   def verifyPlanWithTrait(): Unit = {
     doVerifyPlan(
-      SqlExplainLevel.DIGEST_ATTRIBUTES,
+      SqlExplainLevel.EXPPLAN_ATTRIBUTES,
       withRetractTraits = true,
       withRowType = false,
       printPlanBefore = true)
@@ -779,7 +779,7 @@ case class StreamTableTestUtil(
   def verifyPlanWithTrait(sql: String): Unit = {
     doVerifyPlan(
       sql,
-      SqlExplainLevel.DIGEST_ATTRIBUTES,
+      SqlExplainLevel.EXPPLAN_ATTRIBUTES,
       withRetractTraits = true,
       withRowType = false,
       printPlanBefore = true)
@@ -788,7 +788,7 @@ case class StreamTableTestUtil(
   def verifyPlanWithTrait(table: Table): Unit = {
     doVerifyPlan(
       table,
-      SqlExplainLevel.DIGEST_ATTRIBUTES,
+      SqlExplainLevel.EXPPLAN_ATTRIBUTES,
       withRetractTraits = true,
       withRowType = false,
       printPlanBefore = true)

Reply via email to