This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dc0b143  [FLINK-20782][table-planner-blink] Introduce 
BatchPhysicalRank, and make BatchExecRank only extended from ExecNode
dc0b143 is described below

commit dc0b143bb85bd674ea8fe186d1d2bb72d3bd70a9
Author: 龙三 <[email protected]>
AuthorDate: Mon Dec 28 15:05:04 2020 +0800

    [FLINK-20782][table-planner-blink] Introduce BatchPhysicalRank, and make 
BatchExecRank only extended from ExecNode
    
    This closes #14506
---
 .../plan/nodes/exec/batch/BatchExecRank.java       | 121 +++++++++++++++++++++
 ...BatchExecRank.scala => BatchPhysicalRank.scala} |  74 +++----------
 .../planner/plan/rules/FlinkBatchRuleSets.scala    |   2 +-
 ...cRankRule.scala => BatchPhysicalRankRule.scala} |  22 ++--
 .../batch/RemoveRedundantLocalRankRule.scala       |  16 +--
 .../metadata/FlinkRelMdColumnIntervalTest.scala    |   5 +-
 .../metadata/FlinkRelMdDistinctRowCountTest.scala  |   4 +-
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |   8 +-
 .../metadata/FlinkRelMdPopulationSizeTest.scala    |   4 +-
 .../plan/metadata/FlinkRelMdSelectivityTest.scala  |   4 +-
 10 files changed, 168 insertions(+), 92 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecRank.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecRank.java
new file mode 100644
index 0000000..4da5998
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecRank.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.runtime.operators.sort.RankOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/**
+ * {@link BatchExecNode} for Rank.
+ *
+ * <p>This node supports two-stage(local and global) rank to reduce 
data-shuffling.
+ */
+public class BatchExecRank extends ExecNodeBase<RowData> implements 
BatchExecNode<RowData> {
+
+    private final int[] partitionFields;
+    private final int[] sortFields;
+    private final long rankStart;
+    private final long rankEnd;
+    private final boolean outputRankNumber;
+
+    public BatchExecRank(
+            int[] partitionFields,
+            int[] sortFields,
+            long rankStart,
+            long rankEnd,
+            boolean outputRankNumber,
+            ExecEdge inputEdge,
+            RowType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdge), outputType, description);
+        this.partitionFields = partitionFields;
+        this.sortFields = sortFields;
+        this.rankStart = rankStart;
+        this.rankEnd = rankEnd;
+        this.outputRankNumber = outputRankNumber;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+        ExecNode<RowData> inputNode = (ExecNode<RowData>) 
getInputNodes().get(0);
+        Transformation<RowData> inputTransform = 
inputNode.translateToPlan(planner);
+
+        RowType inputType = (RowType) inputNode.getOutputType();
+        LogicalType[] partitionTypes =
+                IntStream.of(partitionFields)
+                        .mapToObj(inputType::getTypeAt)
+                        .toArray(LogicalType[]::new);
+        LogicalType[] sortTypes =
+                
IntStream.of(sortFields).mapToObj(inputType::getTypeAt).toArray(LogicalType[]::new);
+
+        // operator needn't cache data
+        // The collation for the partition-by and order-by fields is 
inessential here,
+        // we only use the comparator to distinguish fields change.
+        RankOperator operator =
+                new RankOperator(
+                        ComparatorCodeGenerator.gen(
+                                planner.getTableConfig(),
+                                "PartitionByComparator",
+                                partitionFields,
+                                partitionTypes,
+                                new boolean[partitionFields.length],
+                                new boolean[partitionFields.length]),
+                        ComparatorCodeGenerator.gen(
+                                planner.getTableConfig(),
+                                "OrderByComparator",
+                                sortFields,
+                                sortTypes,
+                                new boolean[sortFields.length],
+                                new boolean[sortFields.length]),
+                        rankStart,
+                        rankEnd,
+                        outputRankNumber);
+
+        OneInputTransformation<RowData, RowData> transform =
+                new OneInputTransformation<>(
+                        inputTransform,
+                        getDesc(),
+                        SimpleOperatorFactory.of(operator),
+                        InternalTypeInfo.of((RowType) getOutputType()),
+                        inputTransform.getParallelism());
+
+        if (inputsContainSingleton()) {
+            transform.setParallelism(1);
+            transform.setMaxParallelism(1);
+        }
+
+        return transform;
+    }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalRank.scala
similarity index 78%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalRank.scala
index afc811e..881be79 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalRank.scala
@@ -18,23 +18,16 @@
 
 package org.apache.flink.table.planner.plan.nodes.physical.batch
 
-import org.apache.flink.api.dag.Transformation
-import org.apache.flink.streaming.api.operators.SimpleOperatorFactory
 import org.apache.flink.table.api.TableException
-import org.apache.flink.table.data.RowData
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator
-import org.apache.flink.table.planner.delegation.BatchPlanner
 import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, 
FlinkRelDistributionTraitDef}
 import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory}
 import org.apache.flink.table.planner.plan.nodes.calcite.Rank
-import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
-import org.apache.flink.table.planner.plan.nodes.exec.{LegacyBatchExecNode, 
ExecEdge}
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecRank
 import 
org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecJoinRuleBase
 import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, 
RelExplainUtil}
 import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, 
RankRange, RankType}
-import org.apache.flink.table.runtime.operators.sort.RankOperator
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
 
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelDistribution.Type
@@ -53,7 +46,7 @@ import scala.collection.JavaConversions._
   *
   * This node supports two-stage(local and global) rank to reduce 
data-shuffling.
   */
-class BatchExecRank(
+class BatchPhysicalRank(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputRel: RelNode,
@@ -74,8 +67,7 @@ class BatchExecRank(
     rankRange,
     rankNumberType,
     outputRankNumber)
-  with BatchPhysicalRel
-  with LegacyBatchExecNode[RowData] {
+  with BatchPhysicalRel {
 
   require(rankType == RankType.RANK, "Only RANK is supported now")
   val (rankStart, rankEnd) = rankRange match {
@@ -84,7 +76,7 @@ class BatchExecRank(
   }
 
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
-    new BatchExecRank(
+    new BatchPhysicalRank(
       cluster,
       traitSet,
       inputs.get(0),
@@ -235,54 +227,16 @@ class BatchExecRank(
     }
   }
 
-  //~ ExecNode methods 
-----------------------------------------------------------
-
-  override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT)
-
-  override protected def translateToPlanInternal(
-      planner: BatchPlanner): Transformation[RowData] = {
-    val input = getInputNodes.get(0).translateToPlan(planner)
-        .asInstanceOf[Transformation[RowData]]
-    val outputType = FlinkTypeFactory.toLogicalRowType(getRowType)
-    val partitionBySortingKeys = partitionKey.toArray
-    // The collation for the partition-by fields is inessential here, we only 
use the
-    // comparator to distinguish different groups.
-    // (order[is_asc], null_is_last)
-    val partitionBySortCollation = partitionBySortingKeys.map(_ => (true, 
true))
-
-    // The collation for the order-by fields is inessential here, we only use 
the
-    // comparator to distinguish order-by fields change.
-    // (order[is_asc], null_is_last)
-    val orderByCollation = orderKey.getFieldCollations.map(_ => (true, 
true)).toArray
-    val orderByKeys = orderKey.getFieldCollations.map(_.getFieldIndex).toArray
-
-    val inputType = FlinkTypeFactory.toLogicalRowType(getInput.getRowType)
-    //operator needn't cache data
-    val operator = new RankOperator(
-      ComparatorCodeGenerator.gen(
-        planner.getTableConfig,
-        "PartitionByComparator",
-        partitionBySortingKeys,
-        partitionBySortingKeys.map(inputType.getTypeAt),
-        partitionBySortCollation.map(_._1),
-        partitionBySortCollation.map(_._2)),
-      ComparatorCodeGenerator.gen(
-        planner.getTableConfig,
-        "OrderByComparator",
-        orderByKeys,
-        orderByKeys.map(inputType.getTypeAt),
-        orderByCollation.map(_._1),
-        orderByCollation.map(_._2)),
+  override def translateToExecNode(): ExecNode[_] = {
+    new BatchExecRank(
+      partitionKey.toArray,
+      orderKey.getFieldCollations.map(_.getFieldIndex).toArray,
       rankStart,
       rankEnd,
-      outputRankNumber)
-
-    ExecNodeUtil.createOneInputTransformation(
-      input,
-      getRelDetailedDescription,
-      SimpleOperatorFactory.of(operator),
-      InternalTypeInfo.of(outputType),
-      input.getParallelism,
-      0)
+      outputRankNumber,
+      ExecEdge.DEFAULT,
+      FlinkTypeFactory.toLogicalRowType(getRowType),
+      getRelDetailedDescription
+    )
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index fc75428..a1801e9 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -409,7 +409,7 @@ object FlinkBatchRuleSets {
     BatchPhysicalLimitRule.INSTANCE,
     BatchExecSortLimitRule.INSTANCE,
     // rank
-    BatchExecRankRule.INSTANCE,
+    BatchPhysicalRankRule.INSTANCE,
     RemoveRedundantLocalRankRule.INSTANCE,
     // expand
     BatchPhysicalExpandRule.INSTANCE,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecRankRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalRankRule.scala
similarity index 90%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecRankRule.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalRankRule.scala
index 9571bb7..2031bc1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecRankRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalRankRule.scala
@@ -22,7 +22,7 @@ import org.apache.flink.table.api.TableException
 import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
 import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, 
RankType}
 
@@ -36,18 +36,18 @@ import scala.collection.JavaConversions._
   * Rule that matches [[FlinkLogicalRank]] with rank function and constant 
rank range,
   * and converts it to
   * {{{
-  * BatchExecRank (global)
+  * BatchPhysicalRank (global)
   * +- BatchPhysicalExchange (singleton if partition keys is empty, else hash)
-  *    +- BatchExecRank (local)
+  *    +- BatchPhysicalRank (local)
   *       +- input of rank
   * }}}
   */
-class BatchExecRankRule
+class BatchPhysicalRankRule
   extends ConverterRule(
     classOf[FlinkLogicalRank],
     FlinkConventions.LOGICAL,
     FlinkConventions.BATCH_PHYSICAL,
-    "BatchExecRankRule") {
+    "BatchPhysicalRankRule") {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val rank: FlinkLogicalRank = call.rel(0)
@@ -70,9 +70,9 @@ class BatchExecRankRule
     val localRequiredTraitSet = emptyTraits.replace(sortCollation)
     val newLocalInput = RelOptRule.convert(rank.getInput, 
localRequiredTraitSet)
 
-    // create local BatchExecRank
+    // create local BatchPhysicalRank
     val localRankRange = new ConstantRankRange(1, rankEnd) // local rank 
always start from 1
-    val localRank = new BatchExecRank(
+    val localRank = new BatchPhysicalRank(
       cluster,
       emptyTraits,
       newLocalInput,
@@ -85,7 +85,7 @@ class BatchExecRankRule
       isGlobal = false
     )
 
-    // create local BatchExecRank
+    // create local BatchPhysicalRank
     val globalRequiredDistribution = if (rank.partitionKey.isEmpty) {
       FlinkRelDistribution.SINGLETON
     } else {
@@ -98,7 +98,7 @@ class BatchExecRankRule
 
     // require SINGLETON or HASH exchange
     val newGlobalInput = RelOptRule.convert(localRank, globalRequiredTraitSet)
-    val globalRank = new BatchExecRank(
+    val globalRank = new BatchPhysicalRank(
       cluster,
       emptyTraits,
       newGlobalInput,
@@ -114,6 +114,6 @@ class BatchExecRankRule
   }
 }
 
-object BatchExecRankRule {
-  val INSTANCE: RelOptRule = new BatchExecRankRule
+object BatchPhysicalRankRule {
+  val INSTANCE: RelOptRule = new BatchPhysicalRankRule
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.scala
index 7d8d488..50490c6 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.plan.rules.physical.batch
 
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank
 
 import org.apache.calcite.plan.RelOptRule._
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
@@ -28,18 +28,18 @@ import org.apache.calcite.rel.RelNode
 import scala.collection.JavaConversions._
 
 /**
-  * Planner rule that matches a global [[BatchExecRank]] on a local 
[[BatchExecRank]],
-  * and merge them into a global [[BatchExecRank]].
+  * Planner rule that matches a global [[BatchPhysicalRank]] on a local 
[[BatchPhysicalRank]],
+  * and merge them into a global [[BatchPhysicalRank]].
   */
 class RemoveRedundantLocalRankRule extends RelOptRule(
-  operand(classOf[BatchExecRank],
-    operand(classOf[BatchExecRank],
+  operand(classOf[BatchPhysicalRank],
+    operand(classOf[BatchPhysicalRank],
       operand(classOf[RelNode], FlinkConventions.BATCH_PHYSICAL, any))),
   "RemoveRedundantLocalRankRule") {
 
   override def matches(call: RelOptRuleCall): Boolean = {
-    val globalRank: BatchExecRank = call.rel(0)
-    val localRank: BatchExecRank = call.rel(1)
+    val globalRank: BatchPhysicalRank = call.rel(0)
+    val localRank: BatchPhysicalRank = call.rel(1)
     globalRank.isGlobal && !localRank.isGlobal &&
       globalRank.rankType == localRank.rankType &&
       globalRank.partitionKey == localRank.partitionKey &&
@@ -48,7 +48,7 @@ class RemoveRedundantLocalRankRule extends RelOptRule(
   }
 
   override def onMatch(call: RelOptRuleCall): Unit = {
-    val globalRank: BatchExecRank = call.rel(0)
+    val globalRank: BatchPhysicalRank = call.rel(0)
     val inputOfLocalRank: RelNode = call.rel(2)
     val newGlobalRank = globalRank.copy(globalRank.getTraitSet, 
List(inputOfLocalRank))
     call.transformTo(newGlobalRank)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala
index da04c77..b235993 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.plan.metadata
 
 import org.apache.flink.table.api.TableException
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank
 import org.apache.flink.table.planner.plan.stats._
 import org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil
 import org.apache.flink.table.planner.{JBoolean, JDouble}
@@ -331,7 +331,8 @@ class FlinkRelMdColumnIntervalTest extends 
FlinkRelMdHandlerTestBase {
         assertNull(mq.getColumnInterval(rank, 5))
         assertNull(mq.getColumnInterval(rank, 6))
         rank match {
-          case r: BatchExecRank if !r.isGlobal => // local batch rank does not 
output rank function
+          case r: BatchPhysicalRank if !r.isGlobal =>
+              // local batch rank does not output rank function
           case _ => assertEquals(ValueInterval(bd(1), bd(5)), 
mq.getColumnInterval(rank, 7))
         }
     }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCountTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCountTest.scala
index a698b12..d81e8cd 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCountTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCountTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.plan.metadata
 
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank
 import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil
 
 import org.apache.calcite.rel.metadata.RelMdUtil
@@ -307,7 +307,7 @@ class FlinkRelMdDistinctRowCountTest extends 
FlinkRelMdHandlerTestBase {
         assertEquals(2.0, mq.getDistinctRowCount(rank, ImmutableBitSet.of(5), 
null))
         assertEquals(null, mq.getDistinctRowCount(rank, ImmutableBitSet.of(6), 
null))
         rank match {
-          case r: BatchExecRank if !r.isGlobal => // local rank does not 
output rank func
+          case r: BatchPhysicalRank if !r.isGlobal => // local rank does not 
output rank func
           case _ =>
             assertEquals(5.0, mq.getDistinctRowCount(rank, 
ImmutableBitSet.of(7), null))
         }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index cbc71d4..c571a16 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -448,7 +448,7 @@ class FlinkRelMdHandlerTestBase {
       outputRankNumber = true
     )
 
-    val batchLocalRank = new BatchExecRank(
+    val batchLocalRank = new BatchPhysicalRank(
       cluster,
       batchPhysicalTraits,
       studentBatchScan,
@@ -464,7 +464,7 @@ class FlinkRelMdHandlerTestBase {
     val hash6 = FlinkRelDistribution.hash(Array(6), requireStrict = true)
     val batchExchange = new BatchPhysicalExchange(
       cluster, batchLocalRank.getTraitSet.replace(hash6), batchLocalRank, 
hash6)
-    val batchGlobalRank = new BatchExecRank(
+    val batchGlobalRank = new BatchPhysicalRank(
       cluster,
       batchPhysicalTraits,
       batchExchange,
@@ -530,7 +530,7 @@ class FlinkRelMdHandlerTestBase {
       outputRankNumber = true
     )
 
-    val batchLocalRank = new BatchExecRank(
+    val batchLocalRank = new BatchPhysicalRank(
       cluster,
       batchPhysicalTraits,
       studentBatchScan,
@@ -546,7 +546,7 @@ class FlinkRelMdHandlerTestBase {
     val hash6 = FlinkRelDistribution.hash(Array(6), requireStrict = true)
     val batchExchange = new BatchPhysicalExchange(
       cluster, batchLocalRank.getTraitSet.replace(hash6), batchLocalRank, 
hash6)
-    val batchGlobalRank = new BatchExecRank(
+    val batchGlobalRank = new BatchPhysicalRank(
       cluster,
       batchPhysicalTraits,
       batchExchange,
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSizeTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSizeTest.scala
index db79714..e57c3ef 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSizeTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSizeTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.plan.metadata
 
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank
 
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.util.ImmutableBitSet
@@ -179,7 +179,7 @@ class FlinkRelMdPopulationSizeTest extends 
FlinkRelMdHandlerTestBase {
         assertNull(mq.getPopulationSize(rank, ImmutableBitSet.of(6)))
         assertEquals(50.0, mq.getPopulationSize(rank, ImmutableBitSet.of(0, 
2)))
         rank match {
-          case r: BatchExecRank =>
+          case r: BatchPhysicalRank =>
             // local batch rank does not output rank func
             // TODO re-check this
             if (r.isGlobal) {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivityTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivityTest.scala
index 906a7f8..dab451c 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivityTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivityTest.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.metadata
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.planner.plan.nodes.calcite.LogicalExpand
 import 
org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalDataStreamTableScan,
 FlinkLogicalExpand, FlinkLogicalOverAggregate}
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecRank, 
BatchPhysicalCalc}
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalRank, 
BatchPhysicalCalc}
 import org.apache.flink.table.planner.plan.utils.ExpandUtil
 
 import com.google.common.collect.{ImmutableList, Lists}
@@ -196,7 +196,7 @@ class FlinkRelMdSelectivityTest extends 
FlinkRelMdHandlerTestBase {
         assertEquals(1.0 / 7.0, mq.getSelectivity(rank, condition1))
 
         rank match {
-          case r: BatchExecRank if !r.isGlobal => // batch local rank does not 
output rank fun
+          case r: BatchPhysicalRank if !r.isGlobal => // batch local rank does 
not output rank fun
           case _ =>
             // rk > 2
             val condition2 =

Reply via email to