This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new dc0b143 [FLINK-20782][table-planner-blink] Introduce
BatchPhysicalRank, and make BatchExecRank only extended from ExecNode
dc0b143 is described below
commit dc0b143bb85bd674ea8fe186d1d2bb72d3bd70a9
Author: 龙三 <[email protected]>
AuthorDate: Mon Dec 28 15:05:04 2020 +0800
[FLINK-20782][table-planner-blink] Introduce BatchPhysicalRank, and make
BatchExecRank only extended from ExecNode
This closes #14506
---
.../plan/nodes/exec/batch/BatchExecRank.java | 121 +++++++++++++++++++++
...BatchExecRank.scala => BatchPhysicalRank.scala} | 74 +++----------
.../planner/plan/rules/FlinkBatchRuleSets.scala | 2 +-
...cRankRule.scala => BatchPhysicalRankRule.scala} | 22 ++--
.../batch/RemoveRedundantLocalRankRule.scala | 16 +--
.../metadata/FlinkRelMdColumnIntervalTest.scala | 5 +-
.../metadata/FlinkRelMdDistinctRowCountTest.scala | 4 +-
.../plan/metadata/FlinkRelMdHandlerTestBase.scala | 8 +-
.../metadata/FlinkRelMdPopulationSizeTest.scala | 4 +-
.../plan/metadata/FlinkRelMdSelectivityTest.scala | 4 +-
10 files changed, 168 insertions(+), 92 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecRank.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecRank.java
new file mode 100644
index 0000000..4da5998
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecRank.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.runtime.operators.sort.RankOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/**
+ * {@link BatchExecNode} for Rank.
+ *
+ * <p>This node supports two-stage(local and global) rank to reduce
data-shuffling.
+ */
+public class BatchExecRank extends ExecNodeBase<RowData> implements
BatchExecNode<RowData> {
+
+ private final int[] partitionFields;
+ private final int[] sortFields;
+ private final long rankStart;
+ private final long rankEnd;
+ private final boolean outputRankNumber;
+
+ public BatchExecRank(
+ int[] partitionFields,
+ int[] sortFields,
+ long rankStart,
+ long rankEnd,
+ boolean outputRankNumber,
+ ExecEdge inputEdge,
+ RowType outputType,
+ String description) {
+ super(Collections.singletonList(inputEdge), outputType, description);
+ this.partitionFields = partitionFields;
+ this.sortFields = sortFields;
+ this.rankStart = rankStart;
+ this.rankEnd = rankEnd;
+ this.outputRankNumber = outputRankNumber;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Transformation<RowData> translateToPlanInternal(PlannerBase
planner) {
+ ExecNode<RowData> inputNode = (ExecNode<RowData>)
getInputNodes().get(0);
+ Transformation<RowData> inputTransform =
inputNode.translateToPlan(planner);
+
+ RowType inputType = (RowType) inputNode.getOutputType();
+ LogicalType[] partitionTypes =
+ IntStream.of(partitionFields)
+ .mapToObj(inputType::getTypeAt)
+ .toArray(LogicalType[]::new);
+ LogicalType[] sortTypes =
+
IntStream.of(sortFields).mapToObj(inputType::getTypeAt).toArray(LogicalType[]::new);
+
+ // operator needn't cache data
+ // The collation for the partition-by and order-by fields is
inessential here,
+ // we only use the comparator to distinguish fields change.
+ RankOperator operator =
+ new RankOperator(
+ ComparatorCodeGenerator.gen(
+ planner.getTableConfig(),
+ "PartitionByComparator",
+ partitionFields,
+ partitionTypes,
+ new boolean[partitionFields.length],
+ new boolean[partitionFields.length]),
+ ComparatorCodeGenerator.gen(
+ planner.getTableConfig(),
+ "OrderByComparator",
+ sortFields,
+ sortTypes,
+ new boolean[sortFields.length],
+ new boolean[sortFields.length]),
+ rankStart,
+ rankEnd,
+ outputRankNumber);
+
+ OneInputTransformation<RowData, RowData> transform =
+ new OneInputTransformation<>(
+ inputTransform,
+ getDesc(),
+ SimpleOperatorFactory.of(operator),
+ InternalTypeInfo.of((RowType) getOutputType()),
+ inputTransform.getParallelism());
+
+ if (inputsContainSingleton()) {
+ transform.setParallelism(1);
+ transform.setMaxParallelism(1);
+ }
+
+ return transform;
+ }
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalRank.scala
similarity index 78%
rename from
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala
rename to
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalRank.scala
index afc811e..881be79 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalRank.scala
@@ -18,23 +18,16 @@
package org.apache.flink.table.planner.plan.nodes.physical.batch
-import org.apache.flink.api.dag.Transformation
-import org.apache.flink.streaming.api.operators.SimpleOperatorFactory
import org.apache.flink.table.api.TableException
-import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator
-import org.apache.flink.table.planner.delegation.BatchPlanner
import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution,
FlinkRelDistributionTraitDef}
import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.planner.plan.nodes.calcite.Rank
-import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
-import org.apache.flink.table.planner.plan.nodes.exec.{LegacyBatchExecNode,
ExecEdge}
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecRank
import
org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecJoinRuleBase
import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil,
RelExplainUtil}
import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange,
RankRange, RankType}
-import org.apache.flink.table.runtime.operators.sort.RankOperator
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelDistribution.Type
@@ -53,7 +46,7 @@ import scala.collection.JavaConversions._
*
* This node supports two-stage(local and global) rank to reduce
data-shuffling.
*/
-class BatchExecRank(
+class BatchPhysicalRank(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputRel: RelNode,
@@ -74,8 +67,7 @@ class BatchExecRank(
rankRange,
rankNumberType,
outputRankNumber)
- with BatchPhysicalRel
- with LegacyBatchExecNode[RowData] {
+ with BatchPhysicalRel {
require(rankType == RankType.RANK, "Only RANK is supported now")
val (rankStart, rankEnd) = rankRange match {
@@ -84,7 +76,7 @@ class BatchExecRank(
}
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]):
RelNode = {
- new BatchExecRank(
+ new BatchPhysicalRank(
cluster,
traitSet,
inputs.get(0),
@@ -235,54 +227,16 @@ class BatchExecRank(
}
}
- //~ ExecNode methods
-----------------------------------------------------------
-
- override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT)
-
- override protected def translateToPlanInternal(
- planner: BatchPlanner): Transformation[RowData] = {
- val input = getInputNodes.get(0).translateToPlan(planner)
- .asInstanceOf[Transformation[RowData]]
- val outputType = FlinkTypeFactory.toLogicalRowType(getRowType)
- val partitionBySortingKeys = partitionKey.toArray
- // The collation for the partition-by fields is inessential here, we only
use the
- // comparator to distinguish different groups.
- // (order[is_asc], null_is_last)
- val partitionBySortCollation = partitionBySortingKeys.map(_ => (true,
true))
-
- // The collation for the order-by fields is inessential here, we only use
the
- // comparator to distinguish order-by fields change.
- // (order[is_asc], null_is_last)
- val orderByCollation = orderKey.getFieldCollations.map(_ => (true,
true)).toArray
- val orderByKeys = orderKey.getFieldCollations.map(_.getFieldIndex).toArray
-
- val inputType = FlinkTypeFactory.toLogicalRowType(getInput.getRowType)
- //operator needn't cache data
- val operator = new RankOperator(
- ComparatorCodeGenerator.gen(
- planner.getTableConfig,
- "PartitionByComparator",
- partitionBySortingKeys,
- partitionBySortingKeys.map(inputType.getTypeAt),
- partitionBySortCollation.map(_._1),
- partitionBySortCollation.map(_._2)),
- ComparatorCodeGenerator.gen(
- planner.getTableConfig,
- "OrderByComparator",
- orderByKeys,
- orderByKeys.map(inputType.getTypeAt),
- orderByCollation.map(_._1),
- orderByCollation.map(_._2)),
+ override def translateToExecNode(): ExecNode[_] = {
+ new BatchExecRank(
+ partitionKey.toArray,
+ orderKey.getFieldCollations.map(_.getFieldIndex).toArray,
rankStart,
rankEnd,
- outputRankNumber)
-
- ExecNodeUtil.createOneInputTransformation(
- input,
- getRelDetailedDescription,
- SimpleOperatorFactory.of(operator),
- InternalTypeInfo.of(outputType),
- input.getParallelism,
- 0)
+ outputRankNumber,
+ ExecEdge.DEFAULT,
+ FlinkTypeFactory.toLogicalRowType(getRowType),
+ getRelDetailedDescription
+ )
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index fc75428..a1801e9 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -409,7 +409,7 @@ object FlinkBatchRuleSets {
BatchPhysicalLimitRule.INSTANCE,
BatchExecSortLimitRule.INSTANCE,
// rank
- BatchExecRankRule.INSTANCE,
+ BatchPhysicalRankRule.INSTANCE,
RemoveRedundantLocalRankRule.INSTANCE,
// expand
BatchPhysicalExpandRule.INSTANCE,
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecRankRule.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalRankRule.scala
similarity index 90%
rename from
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecRankRule.scala
rename to
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalRankRule.scala
index 9571bb7..2031bc1 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecRankRule.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalRankRule.scala
@@ -22,7 +22,7 @@ import org.apache.flink.table.api.TableException
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange,
RankType}
@@ -36,18 +36,18 @@ import scala.collection.JavaConversions._
* Rule that matches [[FlinkLogicalRank]] with rank function and constant
rank range,
* and converts it to
* {{{
- * BatchExecRank (global)
+ * BatchPhysicalRank (global)
* +- BatchPhysicalExchange (singleton if partition keys is empty, else hash)
- * +- BatchExecRank (local)
+ * +- BatchPhysicalRank (local)
* +- input of rank
* }}}
*/
-class BatchExecRankRule
+class BatchPhysicalRankRule
extends ConverterRule(
classOf[FlinkLogicalRank],
FlinkConventions.LOGICAL,
FlinkConventions.BATCH_PHYSICAL,
- "BatchExecRankRule") {
+ "BatchPhysicalRankRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val rank: FlinkLogicalRank = call.rel(0)
@@ -70,9 +70,9 @@ class BatchExecRankRule
val localRequiredTraitSet = emptyTraits.replace(sortCollation)
val newLocalInput = RelOptRule.convert(rank.getInput,
localRequiredTraitSet)
- // create local BatchExecRank
+ // create local BatchPhysicalRank
val localRankRange = new ConstantRankRange(1, rankEnd) // local rank
always start from 1
- val localRank = new BatchExecRank(
+ val localRank = new BatchPhysicalRank(
cluster,
emptyTraits,
newLocalInput,
@@ -85,7 +85,7 @@ class BatchExecRankRule
isGlobal = false
)
- // create local BatchExecRank
+ // create local BatchPhysicalRank
val globalRequiredDistribution = if (rank.partitionKey.isEmpty) {
FlinkRelDistribution.SINGLETON
} else {
@@ -98,7 +98,7 @@ class BatchExecRankRule
// require SINGLETON or HASH exchange
val newGlobalInput = RelOptRule.convert(localRank, globalRequiredTraitSet)
- val globalRank = new BatchExecRank(
+ val globalRank = new BatchPhysicalRank(
cluster,
emptyTraits,
newGlobalInput,
@@ -114,6 +114,6 @@ class BatchExecRankRule
}
}
-object BatchExecRankRule {
- val INSTANCE: RelOptRule = new BatchExecRankRule
+object BatchPhysicalRankRule {
+ val INSTANCE: RelOptRule = new BatchPhysicalRankRule
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.scala
index 7d8d488..50490c6 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.scala
@@ -19,7 +19,7 @@
package org.apache.flink.table.planner.plan.rules.physical.batch
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank
import org.apache.calcite.plan.RelOptRule._
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
@@ -28,18 +28,18 @@ import org.apache.calcite.rel.RelNode
import scala.collection.JavaConversions._
/**
- * Planner rule that matches a global [[BatchExecRank]] on a local
[[BatchExecRank]],
- * and merge them into a global [[BatchExecRank]].
+ * Planner rule that matches a global [[BatchPhysicalRank]] on a local
[[BatchPhysicalRank]],
+ * and merge them into a global [[BatchPhysicalRank]].
*/
class RemoveRedundantLocalRankRule extends RelOptRule(
- operand(classOf[BatchExecRank],
- operand(classOf[BatchExecRank],
+ operand(classOf[BatchPhysicalRank],
+ operand(classOf[BatchPhysicalRank],
operand(classOf[RelNode], FlinkConventions.BATCH_PHYSICAL, any))),
"RemoveRedundantLocalRankRule") {
override def matches(call: RelOptRuleCall): Boolean = {
- val globalRank: BatchExecRank = call.rel(0)
- val localRank: BatchExecRank = call.rel(1)
+ val globalRank: BatchPhysicalRank = call.rel(0)
+ val localRank: BatchPhysicalRank = call.rel(1)
globalRank.isGlobal && !localRank.isGlobal &&
globalRank.rankType == localRank.rankType &&
globalRank.partitionKey == localRank.partitionKey &&
@@ -48,7 +48,7 @@ class RemoveRedundantLocalRankRule extends RelOptRule(
}
override def onMatch(call: RelOptRuleCall): Unit = {
- val globalRank: BatchExecRank = call.rel(0)
+ val globalRank: BatchPhysicalRank = call.rel(0)
val inputOfLocalRank: RelNode = call.rel(2)
val newGlobalRank = globalRank.copy(globalRank.getTraitSet,
List(inputOfLocalRank))
call.transformTo(newGlobalRank)
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala
index da04c77..b235993 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala
@@ -19,7 +19,7 @@
package org.apache.flink.table.planner.plan.metadata
import org.apache.flink.table.api.TableException
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank
import org.apache.flink.table.planner.plan.stats._
import org.apache.flink.table.planner.plan.utils.ColumnIntervalUtil
import org.apache.flink.table.planner.{JBoolean, JDouble}
@@ -331,7 +331,8 @@ class FlinkRelMdColumnIntervalTest extends
FlinkRelMdHandlerTestBase {
assertNull(mq.getColumnInterval(rank, 5))
assertNull(mq.getColumnInterval(rank, 6))
rank match {
- case r: BatchExecRank if !r.isGlobal => // local batch rank does not
output rank function
+ case r: BatchPhysicalRank if !r.isGlobal =>
+ // local batch rank does not output rank function
case _ => assertEquals(ValueInterval(bd(1), bd(5)),
mq.getColumnInterval(rank, 7))
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCountTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCountTest.scala
index a698b12..d81e8cd 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCountTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCountTest.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.planner.plan.metadata
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank
import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil
import org.apache.calcite.rel.metadata.RelMdUtil
@@ -307,7 +307,7 @@ class FlinkRelMdDistinctRowCountTest extends
FlinkRelMdHandlerTestBase {
assertEquals(2.0, mq.getDistinctRowCount(rank, ImmutableBitSet.of(5),
null))
assertEquals(null, mq.getDistinctRowCount(rank, ImmutableBitSet.of(6),
null))
rank match {
- case r: BatchExecRank if !r.isGlobal => // local rank does not
output rank func
+ case r: BatchPhysicalRank if !r.isGlobal => // local rank does not
output rank func
case _ =>
assertEquals(5.0, mq.getDistinctRowCount(rank,
ImmutableBitSet.of(7), null))
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index cbc71d4..c571a16 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -448,7 +448,7 @@ class FlinkRelMdHandlerTestBase {
outputRankNumber = true
)
- val batchLocalRank = new BatchExecRank(
+ val batchLocalRank = new BatchPhysicalRank(
cluster,
batchPhysicalTraits,
studentBatchScan,
@@ -464,7 +464,7 @@ class FlinkRelMdHandlerTestBase {
val hash6 = FlinkRelDistribution.hash(Array(6), requireStrict = true)
val batchExchange = new BatchPhysicalExchange(
cluster, batchLocalRank.getTraitSet.replace(hash6), batchLocalRank,
hash6)
- val batchGlobalRank = new BatchExecRank(
+ val batchGlobalRank = new BatchPhysicalRank(
cluster,
batchPhysicalTraits,
batchExchange,
@@ -530,7 +530,7 @@ class FlinkRelMdHandlerTestBase {
outputRankNumber = true
)
- val batchLocalRank = new BatchExecRank(
+ val batchLocalRank = new BatchPhysicalRank(
cluster,
batchPhysicalTraits,
studentBatchScan,
@@ -546,7 +546,7 @@ class FlinkRelMdHandlerTestBase {
val hash6 = FlinkRelDistribution.hash(Array(6), requireStrict = true)
val batchExchange = new BatchPhysicalExchange(
cluster, batchLocalRank.getTraitSet.replace(hash6), batchLocalRank,
hash6)
- val batchGlobalRank = new BatchExecRank(
+ val batchGlobalRank = new BatchPhysicalRank(
cluster,
batchPhysicalTraits,
batchExchange,
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSizeTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSizeTest.scala
index db79714..e57c3ef 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSizeTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSizeTest.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.planner.plan.metadata
-import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecRank
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.util.ImmutableBitSet
@@ -179,7 +179,7 @@ class FlinkRelMdPopulationSizeTest extends
FlinkRelMdHandlerTestBase {
assertNull(mq.getPopulationSize(rank, ImmutableBitSet.of(6)))
assertEquals(50.0, mq.getPopulationSize(rank, ImmutableBitSet.of(0,
2)))
rank match {
- case r: BatchExecRank =>
+ case r: BatchPhysicalRank =>
// local batch rank does not output rank func
// TODO re-check this
if (r.isGlobal) {
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivityTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivityTest.scala
index 906a7f8..dab451c 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivityTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivityTest.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.metadata
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalExpand
import
org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalDataStreamTableScan,
FlinkLogicalExpand, FlinkLogicalOverAggregate}
-import
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecRank,
BatchPhysicalCalc}
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalRank,
BatchPhysicalCalc}
import org.apache.flink.table.planner.plan.utils.ExpandUtil
import com.google.common.collect.{ImmutableList, Lists}
@@ -196,7 +196,7 @@ class FlinkRelMdSelectivityTest extends
FlinkRelMdHandlerTestBase {
assertEquals(1.0 / 7.0, mq.getSelectivity(rank, condition1))
rank match {
- case r: BatchExecRank if !r.isGlobal => // batch local rank does not
output rank fun
+ case r: BatchPhysicalRank if !r.isGlobal => // batch local rank does
not output rank fun
case _ =>
// rk > 2
val condition2 =