This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8e591ad0461e67554ffc1bc9261055f9405ecbc
Author: godfreyhe <[email protected]>
AuthorDate: Thu Jan 7 14:48:38 2021 +0800

    [FLINK-20857][table-planner-blink] Introduce 
BatchPhysicalHashWindowAggregate & BatchPhysicalLocalHashWindowAggregate, and 
make BatchExecHashWindowAggregate only extended from ExecNode
    
    This closes #14574
---
 .../exec/batch/BatchExecHashWindowAggregate.java   | 149 +++++++++++++++++++++
 .../agg/batch/HashWindowCodeGenerator.scala        |   2 +-
 .../agg/batch/SortWindowCodeGenerator.scala        |   2 +-
 .../codegen/agg/batch/WindowCodeGenerator.scala    |  33 ++---
 .../metadata/AggCallSelectivityEstimator.scala     |   4 +-
 .../plan/metadata/FlinkRelMdColumnInterval.scala   |   2 +-
 .../planner/plan/metadata/FlinkRelMdSize.scala     |   2 +-
 .../BatchExecPythonGroupWindowAggregate.scala      |   6 +-
 .../batch/BatchExecSortWindowAggregateBase.scala   |   6 +-
 ...cala => BatchPhysicalHashWindowAggregate.scala} |  42 +++---
 ... => BatchPhysicalHashWindowAggregateBase.scala} |  67 +--------
 ...=> BatchPhysicalLocalHashWindowAggregate.scala} |  39 ++++--
 .../planner/plan/rules/FlinkBatchRuleSets.scala    |   2 +-
 .../batch/BatchPhysicalSortLimitRule.scala         |   8 +-
 ...cala => BatchPhysicalWindowAggregateRule.scala} |  24 ++--
 .../batch/RemoveRedundantLocalHashAggRule.scala    |   2 +-
 .../batch/RemoveRedundantLocalSortAggRule.scala    |   2 +-
 .../table/planner/plan/utils/FlinkRelMdUtil.scala  |   6 +-
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |  18 +--
 19 files changed, 270 insertions(+), 146 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashWindowAggregate.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashWindowAggregate.java
new file mode 100644
index 0000000..d8b147d
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashWindowAggregate.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import 
org.apache.flink.table.planner.codegen.agg.batch.HashWindowCodeGenerator;
+import org.apache.flink.table.planner.codegen.agg.batch.WindowCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.generated.GeneratedOperator;
+import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.rel.core.AggregateCall;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/** Batch {@link ExecNode} for hash-based window aggregate operator. */
+public class BatchExecHashWindowAggregate extends ExecNodeBase<RowData>
+        implements BatchExecNode<RowData> {
+
+    private final int[] grouping;
+    private final int[] auxGrouping;
+    private final AggregateCall[] aggCalls;
+    private final LogicalWindow window;
+    private final int inputTimeFieldIndex;
+    private final boolean inputTimeIsDate;
+    private final PlannerNamedWindowProperty[] namedWindowProperties;
+    private final RowType aggInputRowType;
+    private final boolean enableAssignPane;
+    private final boolean isMerge;
+    private final boolean isFinal;
+
+    public BatchExecHashWindowAggregate(
+            int[] grouping,
+            int[] auxGrouping,
+            AggregateCall[] aggCalls,
+            LogicalWindow window,
+            int inputTimeFieldIndex,
+            boolean inputTimeIsDate,
+            PlannerNamedWindowProperty[] namedWindowProperties,
+            RowType aggInputRowType,
+            boolean enableAssignPane,
+            boolean isMerge,
+            boolean isFinal,
+            ExecEdge inputEdge,
+            RowType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdge), outputType, description);
+        this.grouping = grouping;
+        this.auxGrouping = auxGrouping;
+        this.aggCalls = aggCalls;
+        this.window = window;
+        this.inputTimeFieldIndex = inputTimeFieldIndex;
+        this.inputTimeIsDate = inputTimeIsDate;
+        this.namedWindowProperties = namedWindowProperties;
+        this.aggInputRowType = aggInputRowType;
+        this.enableAssignPane = enableAssignPane;
+        this.isMerge = isMerge;
+        this.isFinal = isFinal;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+        final ExecNode<RowData> inputNode = (ExecNode<RowData>) 
getInputNodes().get(0);
+        final Transformation<RowData> inputTransform = 
inputNode.translateToPlan(planner);
+
+        final AggregateInfoList aggInfos =
+                AggregateUtil.transformToBatchAggregateInfoList(
+                        aggInputRowType,
+                        
JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
+                        null,
+                        null);
+        final TableConfig tableConfig = planner.getTableConfig();
+        final RowType inputRowType = (RowType) inputNode.getOutputType();
+        final HashWindowCodeGenerator hashWindowCodeGenerator =
+                new HashWindowCodeGenerator(
+                        new CodeGeneratorContext(tableConfig),
+                        planner.getRelBuilder(),
+                        window,
+                        inputTimeFieldIndex,
+                        inputTimeIsDate,
+                        
JavaScalaConversionUtil.toScala(Arrays.asList(namedWindowProperties)),
+                        aggInfos,
+                        inputRowType,
+                        grouping,
+                        auxGrouping,
+                        enableAssignPane,
+                        isMerge,
+                        isFinal);
+        final int groupBufferLimitSize =
+                tableConfig
+                        .getConfiguration()
+                        
.getInteger(ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT);
+        final Tuple2<Long, Long> windowSizeAndSlideSize = 
WindowCodeGenerator.getWindowDef(window);
+        final GeneratedOperator<OneInputStreamOperator<RowData, RowData>> 
generatedOperator =
+                hashWindowCodeGenerator.gen(
+                        inputRowType,
+                        (RowType) getOutputType(),
+                        groupBufferLimitSize,
+                        0, // windowStart
+                        windowSizeAndSlideSize.f0,
+                        windowSizeAndSlideSize.f1);
+
+        final long managedMemory =
+                ExecNodeUtil.getMemorySize(
+                        tableConfig, 
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY);
+        return ExecNodeUtil.createOneInputTransformation(
+                inputTransform,
+                getDesc(),
+                new CodeGenOperatorFactory<>(generatedOperator),
+                InternalTypeInfo.of(getOutputType()),
+                inputTransform.getParallelism(),
+                managedMemory);
+    }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
index a1d0a65..76de099 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
@@ -72,7 +72,7 @@ class HashWindowCodeGenerator(
     inputTimeIsDate: Boolean,
     namedProperties: Seq[PlannerNamedWindowProperty],
     aggInfoList: AggregateInfoList,
-    inputRowType: RelDataType,
+    inputRowType: RowType,
     grouping: Array[Int],
     auxGrouping: Array[Int],
     enableAssignPane: Boolean = true,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortWindowCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortWindowCodeGenerator.scala
index 3677b41..bce1f56 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortWindowCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortWindowCodeGenerator.scala
@@ -62,7 +62,7 @@ class SortWindowCodeGenerator(
     inputTimeIsDate: Boolean,
     namedProperties: Seq[PlannerNamedWindowProperty],
     aggInfoList: AggregateInfoList,
-    inputRowType: RelDataType,
+    inputRowType: RowType,
     inputType: RowType,
     outputType: RowType,
     buffLimitSize: Int,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
index b28c813..f30568c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
@@ -18,11 +18,7 @@
 
 package org.apache.flink.table.planner.codegen.agg.batch
 
-import org.apache.calcite.avatica.util.DateTimeUtils
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.tools.RelBuilder
-import org.apache.commons.math3.util.ArithmeticUtils
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.data.binary.BinaryRowData
 import org.apache.flink.table.data.utils.JoinedRowData
@@ -52,6 +48,11 @@ import 
org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_DAY_TIME
 import org.apache.flink.table.types.logical._
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot
 
+import org.apache.calcite.avatica.util.DateTimeUtils
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.tools.RelBuilder
+import org.apache.commons.math3.util.ArithmeticUtils
+
 import scala.collection.JavaConversions._
 
 abstract class WindowCodeGenerator(
@@ -61,14 +62,15 @@ abstract class WindowCodeGenerator(
     inputTimeIsDate: Boolean,
     namedProperties: Seq[PlannerNamedWindowProperty],
     aggInfoList: AggregateInfoList,
-    inputRowType: RelDataType,
+    inputRowType: RowType,
     grouping: Array[Int],
     auxGrouping: Array[Int],
     enableAssignPane: Boolean = true,
     val isMerge: Boolean,
     val isFinal: Boolean) {
 
-  protected lazy val builder: RelBuilder = relBuilder.values(inputRowType)
+  protected lazy val builder: RelBuilder = relBuilder.values(
+    FlinkTypeFactory.INSTANCE.buildRelNodeRowType(inputRowType))
 
   protected lazy val aggInfos: Array[AggregateInfo] = aggInfoList.aggInfos
 
@@ -79,14 +81,12 @@ abstract class WindowCodeGenerator(
     AggCodeGenHelper.getAggBufferNames(auxGrouping, aggInfos)
 
   protected lazy val aggBufferTypes: Array[Array[LogicalType]] = 
AggCodeGenHelper.getAggBufferTypes(
-    inputType,
+    inputRowType,
     auxGrouping,
     aggInfos)
 
-  protected lazy val groupKeyRowType: RowType = 
AggCodeGenHelper.projectRowType(inputType, grouping)
-
-  private lazy val inputType: RowType =
-    FlinkTypeFactory.toLogicalType(inputRowType).asInstanceOf[RowType]
+  protected lazy val groupKeyRowType: RowType =
+    AggCodeGenHelper.projectRowType(inputRowType, grouping)
 
   protected lazy val timestampInternalType: LogicalType =
     if (inputTimeIsDate) new IntType() else new BigIntType()
@@ -116,7 +116,7 @@ abstract class WindowCodeGenerator(
         (groupKeyTypes :+ timestampInternalType) ++ aggBuffTypes,
         ((groupKeyNames :+ "assignedTs$") ++ aggBuffNames).toArray)
     } else {
-      FlinkTypeFactory.toLogicalRowType(inputRowType)
+      inputRowType
     }
   }
 
@@ -680,7 +680,8 @@ abstract class WindowCodeGenerator(
         remainder)),
       literal(index * slideSize))
     exprCodegen.generateExpression(new 
CallExpressionResolver(relBuilder).resolve(expr).accept(
-      new ExpressionConverter(relBuilder.values(inputRowType))))
+      new ExpressionConverter(
+        
relBuilder.values(FlinkTypeFactory.INSTANCE.buildRelNodeRowType(inputRowType)))))
   }
 
   def getGrouping: Array[Int] = grouping
@@ -726,7 +727,7 @@ abstract class WindowCodeGenerator(
 
 object WindowCodeGenerator {
 
-  def getWindowDef(window: LogicalWindow): (Long, Long) = {
+  def getWindowDef(window: LogicalWindow): JTuple2[JLong, JLong] = {
     val (windowSize, slideSize): (Long, Long) = window match {
       case TumblingGroupWindow(_, _, size) if isTimeIntervalLiteral(size) =>
         (asLong(size), asLong(size))
@@ -736,7 +737,7 @@ object WindowCodeGenerator {
         // count tumbling/sliding window and session window not supported now
         throw new UnsupportedOperationException(s"Window $window is not 
supported right now.")
     }
-    (windowSize, slideSize)
+    new JTuple2[JLong, JLong](windowSize, slideSize)
   }
 
   def asLong(expr: Expression): Long = extractValue(expr, classOf[JLong]).get()
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala
index acc90d5..26eba34 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.plan.metadata
 
 import org.apache.flink.table.planner.JDouble
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalHashWindowAggregate,
 BatchExecLocalSortWindowAggregate, BatchPhysicalGroupAggregateBase, 
BatchPhysicalWindowAggregateBase}
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortWindowAggregate,
 BatchPhysicalGroupAggregateBase, BatchPhysicalLocalHashWindowAggregate, 
BatchPhysicalWindowAggregateBase}
 import org.apache.flink.table.planner.plan.stats._
 import org.apache.flink.table.planner.plan.utils.AggregateUtil
 
@@ -62,7 +62,7 @@ class AggCallSelectivityEstimator(agg: RelNode, mq: 
FlinkRelMetadataQuery)
         (rel.getGroupSet.toArray ++ auxGroupSet, otherAggCalls)
       case rel: BatchPhysicalGroupAggregateBase =>
         (rel.grouping ++ rel.auxGrouping, rel.getAggCallList)
-      case rel: BatchExecLocalHashWindowAggregate =>
+      case rel: BatchPhysicalLocalHashWindowAggregate =>
         val fullGrouping = rel.grouping ++ Array(rel.inputTimeFieldIndex) ++ 
rel.auxGrouping
         (fullGrouping, rel.getAggCallList)
       case rel: BatchExecLocalSortWindowAggregate =>
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
index 4b80a55..b1eec16 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
@@ -545,7 +545,7 @@ class FlinkRelMdColumnInterval private extends 
MetadataHandler[ColumnInterval] {
       case agg: BatchExecLocalSortWindowAggregate =>
         // grouping + assignTs + auxGrouping
         agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping
-      case agg: BatchExecLocalHashWindowAggregate =>
+      case agg: BatchPhysicalLocalHashWindowAggregate =>
         // grouping + assignTs + auxGrouping
         agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping
       case agg: BatchPhysicalWindowAggregateBase => agg.grouping ++ 
agg.auxGrouping
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala
index e408203..199e5d4 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala
@@ -213,7 +213,7 @@ class FlinkRelMdSize private extends 
MetadataHandler[BuiltInMetadata.Size] {
     val mapInputToOutput: Map[Int, Int] = windowAgg match {
       case agg: WindowAggregate =>
         AggregateUtil.checkAndGetFullGroupSet(agg).zipWithIndex.toMap
-      case agg: BatchExecLocalHashWindowAggregate =>
+      case agg: BatchPhysicalLocalHashWindowAggregate =>
         // local win-agg output type: grouping + assignTs + auxGrouping + 
aggCalls
         agg.grouping.zipWithIndex.toMap ++
           agg.auxGrouping.zipWithIndex.map {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala
index ba84b24..a5e5d59 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala
@@ -126,7 +126,7 @@ class BatchExecPythonGroupWindowAggregate(
     val outputType = FlinkTypeFactory.toLogicalRowType(getRowType)
     val inputType = FlinkTypeFactory.toLogicalRowType(inputRowType)
 
-    val (windowSize: Long, slideSize: Long) = 
WindowCodeGenerator.getWindowDef(window)
+    val windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window)
 
     val groupBufferLimitSize = 
planner.getTableConfig.getConfiguration.getInteger(
       ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT)
@@ -137,8 +137,8 @@ class BatchExecPythonGroupWindowAggregate(
       outputType,
       inputTimeFieldIndex,
       groupBufferLimitSize,
-      windowSize,
-      slideSize,
+      windowSizeAndSlideSize.f0,
+      windowSizeAndSlideSize.f1,
       getConfig(planner.getExecEnv, planner.getTableConfig))
 
     if 
(isPythonWorkerUsingManagedMemory(planner.getTableConfig.getConfiguration)) {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
index 5ad4827..2395de8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
@@ -103,13 +103,13 @@ abstract class BatchExecSortWindowAggregateBase(
     val groupBufferLimitSize = 
planner.getTableConfig.getConfiguration.getInteger(
       ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT)
 
-    val (windowSize: Long, slideSize: Long) = 
WindowCodeGenerator.getWindowDef(window)
+    val windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window)
 
     val generator = new SortWindowCodeGenerator(
       ctx, planner.getRelBuilder, window, inputTimeFieldIndex,
       inputTimeIsDate, namedWindowProperties,
-      aggInfos, inputRowType, inputType, outputType,
-      groupBufferLimitSize, 0L, windowSize, slideSize,
+      aggInfos, inputType, inputType, outputType,
+      groupBufferLimitSize, 0L, windowSizeAndSlideSize.f0, 
windowSizeAndSlideSize.f1,
       grouping, auxGrouping, enableAssignPane, isMerge, isFinal)
     val generatedOperator = if (grouping.isEmpty) {
       generator.genWithoutKeys()
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashWindowAggregate.scala
similarity index 71%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashWindowAggregate.scala
index f3ce9fe..6ec67fa 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashWindowAggregate.scala
@@ -20,8 +20,10 @@ package 
org.apache.flink.table.planner.plan.nodes.physical.batch
 
 import org.apache.flink.table.functions.UserDefinedFunction
 import 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.logical.LogicalWindow
-import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
+import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashWindowAggregate
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
@@ -30,9 +32,10 @@ import org.apache.calcite.rel.core.AggregateCall
 
 import java.util
 
-import scala.collection.JavaConversions._
-
-class BatchExecHashWindowAggregate(
+/**
+ * Batch physical RelNode for (global) hash-based window aggregate.
+ */
+class BatchPhysicalHashWindowAggregate(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputRel: RelNode,
@@ -47,25 +50,22 @@ class BatchExecHashWindowAggregate(
     namedWindowProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = false,
     isMerge: Boolean)
-  extends BatchExecHashWindowAggregateBase(
+  extends BatchPhysicalHashWindowAggregateBase(
     cluster,
     traitSet,
     inputRel,
     outputRowType,
-    aggInputRowType,
     grouping,
     auxGrouping,
     aggCallToAggFunction,
     window,
-    inputTimeFieldIndex,
-    inputTimeIsDate,
     namedWindowProperties,
     enableAssignPane,
     isMerge,
     isFinal = true) {
 
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
-    new BatchExecHashWindowAggregate(
+    new BatchPhysicalHashWindowAggregate(
       cluster,
       traitSet,
       inputs.get(0),
@@ -82,10 +82,22 @@ class BatchExecHashWindowAggregate(
       isMerge)
   }
 
-  //~ ExecNode methods 
-----------------------------------------------------------
-
-  override def getInputEdges: util.List[ExecEdge] = List(
-    ExecEdge.builder()
-      .damBehavior(ExecEdge.DamBehavior.END_INPUT)
-      .build())
+  override def translateToExecNode(): ExecNode[_] = {
+    new BatchExecHashWindowAggregate(
+      grouping,
+      auxGrouping,
+      getAggCallList.toArray,
+      window,
+      inputTimeFieldIndex,
+      inputTimeIsDate,
+      namedWindowProperties.toArray,
+      FlinkTypeFactory.toLogicalRowType(aggInputRowType),
+      enableAssignPane,
+      isMerge,
+      true, // isFinal is always true
+      ExecEdge.builder().damBehavior(ExecEdge.DamBehavior.END_INPUT).build(),
+      FlinkTypeFactory.toLogicalRowType(getRowType),
+      getRelDetailedDescription
+    )
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashWindowAggregateBase.scala
similarity index 54%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashWindowAggregateBase.scala
index 92bfeca..06f6fea 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashWindowAggregateBase.scala
@@ -18,44 +18,31 @@
 
 package org.apache.flink.table.planner.plan.nodes.physical.batch
 
-import org.apache.flink.api.dag.Transformation
-import org.apache.flink.configuration.MemorySize
-import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.data.RowData
 import org.apache.flink.table.functions.UserDefinedFunction
 import 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.codegen.CodeGeneratorContext
-import 
org.apache.flink.table.planner.codegen.agg.batch.{HashWindowCodeGenerator, 
WindowCodeGenerator}
-import org.apache.flink.table.planner.delegation.BatchPlanner
 import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory}
 import org.apache.flink.table.planner.plan.logical.LogicalWindow
-import org.apache.flink.table.planner.plan.nodes.exec.LegacyBatchExecNode
-import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
-import 
org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToBatchAggregateInfoList
 import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil
-import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
+import org.apache.flink.table.runtime.util.collections.binary.BytesMap
+
 import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.table.runtime.util.collections.binary.BytesMap
 
-abstract class BatchExecHashWindowAggregateBase(
+/**
+ * Batch physical RelNode for hash-based window aggregate.
+ */
+abstract class BatchPhysicalHashWindowAggregateBase(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputRel: RelNode,
     outputRowType: RelDataType,
-    aggInputRowType: RelDataType,
     grouping: Array[Int],
     auxGrouping: Array[Int],
     aggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)],
     window: LogicalWindow,
-    inputTimeFieldIndex: Int,
-    inputTimeIsDate: Boolean,
     namedWindowProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = false,
     isMerge: Boolean,
@@ -72,8 +59,7 @@ abstract class BatchExecHashWindowAggregateBase(
     namedWindowProperties,
     enableAssignPane,
     isMerge,
-    isFinal)
-  with LegacyBatchExecNode[RowData] {
+    isFinal) {
 
   override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
     val numOfGroupKey = grouping.length
@@ -96,43 +82,4 @@ abstract class BatchExecHashWindowAggregateBase(
     val costFactory = planner.getCostFactory.asInstanceOf[FlinkCostFactory]
     costFactory.makeCost(rowCnt, hashCpuCost + aggFunctionCpuCost, 0, 0, 
memCost)
   }
-
-  //~ ExecNode methods 
-----------------------------------------------------------
-
-  override protected def translateToPlanInternal(
-      planner: BatchPlanner): Transformation[RowData] = {
-    val config = planner.getTableConfig
-    val input = getInputNodes.get(0).translateToPlan(planner)
-        .asInstanceOf[Transformation[RowData]]
-    val ctx = CodeGeneratorContext(config)
-    val outputType = FlinkTypeFactory.toLogicalRowType(getRowType)
-    val inputRowType = getInput.getRowType
-    val inputType = FlinkTypeFactory.toLogicalRowType(inputRowType)
-
-    val aggInfos = transformToBatchAggregateInfoList(
-      FlinkTypeFactory.toLogicalRowType(aggInputRowType), 
aggCallToAggFunction.map(_._1))
-
-    val groupBufferLimitSize = config.getConfiguration.getInteger(
-      ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT)
-
-    val (windowSize: Long, slideSize: Long) = 
WindowCodeGenerator.getWindowDef(window)
-
-    val generatedOperator = new HashWindowCodeGenerator(
-      ctx, planner.getRelBuilder, window, inputTimeFieldIndex,
-      inputTimeIsDate, namedWindowProperties,
-      aggInfos, inputRowType, grouping, auxGrouping, enableAssignPane, 
isMerge, isFinal).gen(
-      inputType, outputType, groupBufferLimitSize, 0,
-      windowSize, slideSize)
-    val operator = new CodeGenOperatorFactory[RowData](generatedOperator)
-
-    val managedMemory = MemorySize.parse(config.getConfiguration.getString(
-      ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY)).getBytes
-    ExecNodeUtil.createOneInputTransformation(
-      input,
-      getRelDetailedDescription,
-      operator,
-      InternalTypeInfo.of(outputType),
-      input.getParallelism,
-      managedMemory)
-  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLocalHashWindowAggregate.scala
similarity index 70%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLocalHashWindowAggregate.scala
index 2e19199..937f941 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLocalHashWindowAggregate.scala
@@ -20,8 +20,10 @@ package 
org.apache.flink.table.planner.plan.nodes.physical.batch
 
 import org.apache.flink.table.functions.UserDefinedFunction
 import 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.logical.LogicalWindow
-import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge
+import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashWindowAggregate
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
@@ -30,9 +32,10 @@ import org.apache.calcite.rel.core.AggregateCall
 
 import java.util
 
-import scala.collection.JavaConversions._
-
-class BatchExecLocalHashWindowAggregate(
+/**
+ * Batch physical RelNode for local hash-based window aggregate.
+ */
+class BatchPhysicalLocalHashWindowAggregate(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputRel: RelNode,
@@ -46,25 +49,22 @@ class BatchExecLocalHashWindowAggregate(
     inputTimeIsDate: Boolean,
     namedWindowProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = false)
-  extends BatchExecHashWindowAggregateBase(
+  extends BatchPhysicalHashWindowAggregateBase(
     cluster,
     traitSet,
     inputRel,
     outputRowType,
-    inputRowType,
     grouping,
     auxGrouping,
     aggCallToAggFunction,
     window,
-    inputTimeFieldIndex,
-    inputTimeIsDate,
     namedWindowProperties,
     enableAssignPane,
     isMerge = false,
     isFinal = false) {
 
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
-    new BatchExecLocalHashWindowAggregate(
+    new BatchPhysicalLocalHashWindowAggregate(
       cluster,
       traitSet,
       inputs.get(0),
@@ -80,7 +80,22 @@ class BatchExecLocalHashWindowAggregate(
       enableAssignPane)
   }
 
-  //~ ExecNode methods 
-----------------------------------------------------------
-
-  override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT)
+  override def translateToExecNode(): ExecNode[_] = {
+    new BatchExecHashWindowAggregate(
+      grouping,
+      auxGrouping,
+      getAggCallList.toArray,
+      window,
+      inputTimeFieldIndex,
+      inputTimeIsDate,
+      namedWindowProperties.toArray,
+      FlinkTypeFactory.toLogicalRowType(inputRowType),
+      enableAssignPane,
+      false, // isMerge is always false
+      false, // isFinal is always false
+      ExecEdge.DEFAULT,
+      FlinkTypeFactory.toLogicalRowType(getRowType),
+      getRelDetailedDescription
+    )
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 1f5809e..bd25c16 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -423,7 +423,7 @@ object FlinkBatchRuleSets {
     // over agg
     BatchExecOverAggregateRule.INSTANCE,
     // window agg
-    BatchExecWindowAggregateRule.INSTANCE,
+    BatchPhysicalWindowAggregateRule.INSTANCE,
     BatchExecPythonWindowAggregateRule.INSTANCE,
     // join
     BatchExecHashJoinRule.INSTANCE,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala
index ef7c0ba..9296afd 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortLimitRule.scala
@@ -61,17 +61,17 @@ class BatchPhysicalSortLimitRule
 
   override def convert(rel: RelNode): RelNode = {
     val sort = rel.asInstanceOf[FlinkLogicalSort]
-    // create local BatchExecSortLimit
+    // create local BatchPhysicalSortLimit
     val localRequiredTrait = 
sort.getInput.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
     val localInput = RelOptRule.convert(sort.getInput, localRequiredTrait)
 
-    // if fetch is null, there is no need to create local BatchExecSortLimit
+    // if fetch is null, there is no need to create local 
BatchPhysicalSortLimit
     val inputOfExchange = if (sort.fetch != null) {
       val limit = SortUtil.getLimitEnd(sort.offset, sort.fetch)
       val rexBuilder = sort.getCluster.getRexBuilder
       val intType = 
rexBuilder.getTypeFactory.createSqlType(SqlTypeName.INTEGER)
       val providedLocalTraitSet = localRequiredTrait.replace(sort.getCollation)
-      // for local BatchExecSortLimit, offset is always 0, and fetch is `limit`
+      // for local BatchPhysicalSortLimit, offset is always 0, and fetch is 
`limit`
       new BatchPhysicalSortLimit(
         rel.getCluster,
         providedLocalTraitSet,
@@ -90,7 +90,7 @@ class BatchPhysicalSortLimitRule
       .replace(FlinkRelDistribution.SINGLETON)
     val newInput = RelOptRule.convert(inputOfExchange, requiredTrait)
 
-    // create global BatchExecSortLimit
+    // create global BatchPhysicalSortLimit
     val providedGlobalTraitSet = requiredTrait.replace(sort.getCollation)
     new BatchPhysicalSortLimit(
       rel.getCluster,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
similarity index 96%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
index bbd96ac..ff79e51 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
@@ -27,7 +27,7 @@ import 
org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
 import org.apache.flink.table.planner.plan.logical.{LogicalWindow, 
SlidingGroupWindow, TumblingGroupWindow}
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWindowAggregate
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecHashWindowAggregate,
 BatchExecLocalHashWindowAggregate, BatchExecLocalSortWindowAggregate, 
BatchExecSortWindowAggregate}
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortWindowAggregate,
 BatchExecSortWindowAggregate, BatchPhysicalHashWindowAggregate, 
BatchPhysicalLocalHashWindowAggregate}
 import org.apache.flink.table.planner.plan.utils.AggregateUtil
 import 
org.apache.flink.table.planner.plan.utils.AggregateUtil.hasTimeIntervalType
 import org.apache.flink.table.planner.plan.utils.PythonUtil.isPythonAggregate
@@ -48,15 +48,15 @@ import scala.collection.JavaConversions._
 /**
  * Rule to convert a [[FlinkLogicalWindowAggregate]] into a
  * {{{
- *   BatchExecHash(or Sort)WindowAggregate (global)
+ *   BatchPhysicalHash(or Sort)WindowAggregate (global)
  *   +- BatchPhysicalExchange (hash by group keys if group keys is not empty, 
else singleton)
- *      +- BatchExecLocalHash(or Sort)WindowAggregate (local)
+ *      +- BatchPhysicalLocalHash(or Sort)WindowAggregate (local)
  *         +- input of window agg
  * }}}
  * when all aggregate functions are mergeable
  * and [[OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY]] is 
TWO_PHASE, or
  * {{{
- *   BatchExecHash(or Sort)WindowAggregate
+ *   BatchPhysicalHash(or Sort)WindowAggregate
  *   +- BatchPhysicalExchange (hash by group keys if group keys is not empty, 
else singleton)
  *      +- input of window agg
  * }}}
@@ -67,12 +67,12 @@ import scala.collection.JavaConversions._
  * this rule will try to create two possibilities above, and chooses the best 
one based on cost.
  * if all aggregate function buffer are fix length, the rule will choose hash 
window agg.
  */
-class BatchExecWindowAggregateRule
+class BatchPhysicalWindowAggregateRule
   extends RelOptRule(
     operand(classOf[FlinkLogicalWindowAggregate],
       operand(classOf[RelNode], any)),
     FlinkRelFactories.LOGICAL_BUILDER_WITHOUT_AGG_INPUT_PRUNE,
-    "BatchExecWindowAggregateRule")
+    "BatchPhysicalWindowAggregateRule")
   with BatchPhysicalAggRuleBase {
 
   override def matches(call: RelOptRuleCall): Boolean = {
@@ -163,7 +163,7 @@ class BatchExecWindowAggregateRule
       input.getRowType, call.builder(), window.timeAttribute)
     val inputTimeFieldType = 
agg.getInput.getRowType.getFieldList.get(inputTimeFieldIndex).getType
     val inputTimeIsDate = inputTimeFieldType.getSqlTypeName == SqlTypeName.DATE
-    // local-agg output order: groupset | assignTs | aucGroupSet | aggCalls
+    // local-agg output order: groupSet | assignTs | auxGroupSet | aggCalls
     val newInputTimeFieldIndexFromLocal = groupSet.length
 
     val config = 
input.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig
@@ -180,7 +180,7 @@ class BatchExecWindowAggregateRule
         val newLocalInput = RelOptRule.convert(input, localRequiredTraitSet)
         val localProvidedTraitSet = localRequiredTraitSet
 
-        new BatchExecLocalHashWindowAggregate(
+        new BatchPhysicalLocalHashWindowAggregate(
           agg.getCluster,
           localProvidedTraitSet,
           newLocalInput,
@@ -234,7 +234,7 @@ class BatchExecWindowAggregateRule
         // hash
         val newGlobalAggInput = RelOptRule.convert(localAgg, 
globalRequiredTraitSet)
 
-        new BatchExecHashWindowAggregate(
+        new BatchPhysicalHashWindowAggregate(
           agg.getCluster,
           aggProvidedTraitSet,
           newGlobalAggInput,
@@ -293,7 +293,7 @@ class BatchExecWindowAggregateRule
         // case 2: Sliding window without pane optimization
         val newInput = RelOptRule.convert(input, requiredTraitSet)
 
-        new BatchExecHashWindowAggregate(
+        new BatchPhysicalHashWindowAggregate(
           agg.getCluster,
           aggProvidedTraitSet,
           newInput,
@@ -430,6 +430,6 @@ class BatchExecWindowAggregateRule
   }
 }
 
-object BatchExecWindowAggregateRule {
-  val INSTANCE: RelOptRule = new BatchExecWindowAggregateRule
+object BatchPhysicalWindowAggregateRule {
+  val INSTANCE: RelOptRule = new BatchPhysicalWindowAggregateRule
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala
index 4484f2e..0a711f4 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala
@@ -49,7 +49,7 @@ class RemoveRedundantLocalHashAggRule extends RelOptRule(
       localAgg.grouping,
       localAgg.auxGrouping,
       // Use the localAgg agg calls because the global agg call filters was 
removed,
-      // see BatchExecHashAggRule for details.
+      // see BatchPhysicalHashAggRule for details.
       localAgg.getAggCallToAggFunction,
       isMerge = false)
     call.transformTo(newGlobalAgg)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala
index 32920f9..833cc08 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala
@@ -48,7 +48,7 @@ abstract class RemoveRedundantLocalSortAggRule(
       localAgg.grouping,
       localAgg.auxGrouping,
       // Use the localAgg agg calls because the global agg call filters was 
removed,
-      // see BatchExecSortAggRule for details.
+      // see BatchPhysicalSortAggRule for details.
       localAgg.getAggCallToAggFunction,
       isMerge = false)
     call.transformTo(newGlobalAgg)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala
index 2afbf0c..81b436e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.planner.JDouble
 import 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, Rank, 
WindowAggregate}
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalHashWindowAggregate,
 BatchExecLocalSortWindowAggregate, BatchPhysicalGroupAggregateBase, 
BatchPhysicalWindowAggregateBase}
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortWindowAggregate,
 BatchPhysicalGroupAggregateBase, BatchPhysicalLocalHashWindowAggregate, 
BatchPhysicalWindowAggregateBase}
 import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, 
RankRange}
 import org.apache.flink.table.runtime.operators.sort.BinaryIndexedSortable
 import 
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.LENGTH_SIZE_IN_BYTES
@@ -334,7 +334,7 @@ object FlinkRelMdUtil {
         // grouping + assignTs + auxGrouping
         (agg.getAggCallList,
           agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping)
-      case agg: BatchExecLocalHashWindowAggregate =>
+      case agg: BatchPhysicalLocalHashWindowAggregate =>
         // grouping + assignTs + auxGrouping
         (agg.getAggCallList,
           agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping)
@@ -388,7 +388,7 @@ object FlinkRelMdUtil {
   }
 
   /**
-   * Split groupKeys on Aggregate/ BatchExecGroupAggregateBase/ 
BatchExecWindowAggregateBase
+   * Split groupKeys on Aggregate/ BatchPhysicalGroupAggregateBase/ 
BatchPhysicalWindowAggregateBase
    * into keys on aggregate's groupKey and aggregate's aggregateCalls.
    *
    * @param agg      the aggregate
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 5a5de0c..5015479 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -1244,7 +1244,7 @@ class FlinkRelMdHandlerTestBase {
         Array("count$0")).toList // agg calls
     val localWindowAggRowType = typeFactory.createStructType(
       localWindowAggTypes, localWindowAggNames)
-    val batchLocalWindowAgg = new BatchExecLocalHashWindowAggregate(
+    val batchLocalWindowAgg = new BatchPhysicalLocalHashWindowAggregate(
       batchCalc.getCluster,
       batchPhysicalTraits,
       batchCalc,
@@ -1260,7 +1260,7 @@ class FlinkRelMdHandlerTestBase {
       enableAssignPane = false)
     val batchExchange2 = new BatchPhysicalExchange(
       cluster, batchPhysicalTraits.replace(hash01), batchLocalWindowAgg, 
hash01)
-    val batchWindowAggWithLocal = new BatchExecHashWindowAggregate(
+    val batchWindowAggWithLocal = new BatchPhysicalHashWindowAggregate(
       cluster,
       batchPhysicalTraits,
       batchExchange2,
@@ -1277,7 +1277,7 @@ class FlinkRelMdHandlerTestBase {
       isMerge = true
     )
 
-    val batchWindowAggWithoutLocal = new BatchExecHashWindowAggregate(
+    val batchWindowAggWithoutLocal = new BatchPhysicalHashWindowAggregate(
       batchExchange1.getCluster,
       batchPhysicalTraits,
       batchExchange1,
@@ -1383,7 +1383,7 @@ class FlinkRelMdHandlerTestBase {
         Array("count$0")).toList // agg calls
     val localWindowAggRowType = typeFactory.createStructType(
       localWindowAggTypes, localWindowAggNames)
-    val batchLocalWindowAgg = new BatchExecLocalHashWindowAggregate(
+    val batchLocalWindowAgg = new BatchPhysicalLocalHashWindowAggregate(
       batchCalc.getCluster,
       batchPhysicalTraits,
       batchCalc,
@@ -1399,7 +1399,7 @@ class FlinkRelMdHandlerTestBase {
       enableAssignPane = false)
     val batchExchange2 = new BatchPhysicalExchange(
       cluster, batchPhysicalTraits.replace(hash1), batchLocalWindowAgg, hash1)
-    val batchWindowAggWithLocal = new BatchExecHashWindowAggregate(
+    val batchWindowAggWithLocal = new BatchPhysicalHashWindowAggregate(
       cluster,
       batchPhysicalTraits,
       batchExchange2,
@@ -1416,7 +1416,7 @@ class FlinkRelMdHandlerTestBase {
       isMerge = true
     )
 
-    val batchWindowAggWithoutLocal = new BatchExecHashWindowAggregate(
+    val batchWindowAggWithoutLocal = new BatchPhysicalHashWindowAggregate(
       batchExchange1.getCluster,
       batchPhysicalTraits,
       batchExchange1,
@@ -1527,7 +1527,7 @@ class FlinkRelMdHandlerTestBase {
         Array("count$0")).toList // agg calls
     val localWindowAggRowType = typeFactory.createStructType(
       localWindowAggTypes, localWindowAggNames)
-    val batchLocalWindowAggWithAuxGroup = new 
BatchExecLocalHashWindowAggregate(
+    val batchLocalWindowAggWithAuxGroup = new 
BatchPhysicalLocalHashWindowAggregate(
       batchCalc.getCluster,
       batchPhysicalTraits,
       batchCalc,
@@ -1543,7 +1543,7 @@ class FlinkRelMdHandlerTestBase {
       enableAssignPane = false)
     val batchExchange2 = new BatchPhysicalExchange(
       cluster, batchPhysicalTraits.replace(hash0), 
batchLocalWindowAggWithAuxGroup, hash0)
-    val batchWindowAggWithLocalWithAuxGroup = new BatchExecHashWindowAggregate(
+    val batchWindowAggWithLocalWithAuxGroup = new 
BatchPhysicalHashWindowAggregate(
       cluster,
       batchPhysicalTraits,
       batchExchange2,
@@ -1560,7 +1560,7 @@ class FlinkRelMdHandlerTestBase {
       isMerge = true
     )
 
-    val batchWindowAggWithoutLocalWithAuxGroup = new 
BatchExecHashWindowAggregate(
+    val batchWindowAggWithoutLocalWithAuxGroup = new 
BatchPhysicalHashWindowAggregate(
       batchExchange1.getCluster,
       batchPhysicalTraits,
       batchExchange1,

Reply via email to