This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 290658283d95be42454953ea484fe39b5ae0b3b4
Author: godfreyhe <[email protected]>
AuthorDate: Wed Jan 6 21:12:01 2021 +0800

    [FLINK-20857][table-planner-blink] Rename BatchExecWindowAggregateBase to 
BatchPhysicalWindowAggregateBase and do some refactoring
    
    This closes #14574
---
 .../batch/BatchExecPythonWindowAggregateRule.java  |  1 -
 .../metadata/AggCallSelectivityEstimator.scala     | 10 +++----
 .../plan/metadata/FlinkRelMdColumnInterval.scala   | 10 +++----
 .../plan/metadata/FlinkRelMdColumnUniqueness.scala |  6 ++--
 .../plan/metadata/FlinkRelMdDistinctRowCount.scala |  8 ++---
 .../plan/metadata/FlinkRelMdPopulationSize.scala   |  6 ++--
 .../planner/plan/metadata/FlinkRelMdRowCount.scala |  8 ++---
 .../plan/metadata/FlinkRelMdSelectivity.scala      |  8 ++---
 .../planner/plan/metadata/FlinkRelMdSize.scala     | 18 ++++++------
 .../plan/metadata/FlinkRelMdUniqueGroups.scala     |  7 ++---
 .../plan/metadata/FlinkRelMdUniqueKeys.scala       |  6 ++--
 .../batch/BatchExecHashWindowAggregate.scala       | 13 ++-------
 .../batch/BatchExecHashWindowAggregateBase.scala   | 14 ++++-----
 .../batch/BatchExecLocalHashWindowAggregate.scala  | 11 ++-----
 .../batch/BatchExecLocalSortWindowAggregate.scala  | 11 ++-----
 .../BatchExecPythonGroupWindowAggregate.scala      | 18 +++++-------
 .../batch/BatchExecSortWindowAggregate.scala       | 13 ++-------
 .../batch/BatchExecSortWindowAggregateBase.scala   | 15 ++++------
 ...cala => BatchPhysicalWindowAggregateBase.scala} | 26 +++++++----------
 .../batch/BatchExecWindowAggregateRule.scala       | 10 -------
 .../table/planner/plan/utils/FlinkRelMdUtil.scala  | 34 +++++++++++-----------
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  | 15 ----------
 22 files changed, 102 insertions(+), 166 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonWindowAggregateRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonWindowAggregateRule.java
index 958188d..ac33ec8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonWindowAggregateRule.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonWindowAggregateRule.java
@@ -149,7 +149,6 @@ public class BatchExecPythonWindowAggregateRule extends 
RelOptRule {
         BatchExecPythonGroupWindowAggregate windowAgg =
                 new BatchExecPythonGroupWindowAggregate(
                         agg.getCluster(),
-                        call.builder(),
                         traitSet,
                         newInput,
                         agg.getRowType(),
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala
index d7b5418..acc90d5 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.plan.metadata
 
 import org.apache.flink.table.planner.JDouble
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalHashWindowAggregate,
 BatchExecLocalSortWindowAggregate, BatchExecWindowAggregateBase, 
BatchPhysicalGroupAggregateBase}
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalHashWindowAggregate,
 BatchExecLocalSortWindowAggregate, BatchPhysicalGroupAggregateBase, 
BatchPhysicalWindowAggregateBase}
 import org.apache.flink.table.planner.plan.stats._
 import org.apache.flink.table.planner.plan.utils.AggregateUtil
 
@@ -63,13 +63,13 @@ class AggCallSelectivityEstimator(agg: RelNode, mq: 
FlinkRelMetadataQuery)
       case rel: BatchPhysicalGroupAggregateBase =>
         (rel.grouping ++ rel.auxGrouping, rel.getAggCallList)
       case rel: BatchExecLocalHashWindowAggregate =>
-        val fullGrouping = rel.getGrouping ++ Array(rel.inputTimeFieldIndex) 
++ rel.getAuxGrouping
+        val fullGrouping = rel.grouping ++ Array(rel.inputTimeFieldIndex) ++ 
rel.auxGrouping
         (fullGrouping, rel.getAggCallList)
       case rel: BatchExecLocalSortWindowAggregate =>
-        val fullGrouping = rel.getGrouping ++ Array(rel.inputTimeFieldIndex) 
++ rel.getAuxGrouping
+        val fullGrouping = rel.grouping ++ Array(rel.inputTimeFieldIndex) ++ 
rel.auxGrouping
         (fullGrouping, rel.getAggCallList)
-      case rel: BatchExecWindowAggregateBase =>
-        (rel.getGrouping ++ rel.getAuxGrouping, rel.getAggCallList)
+      case rel: BatchPhysicalWindowAggregateBase =>
+        (rel.grouping ++ rel.auxGrouping, rel.getAggCallList)
       case _ => throw new IllegalArgumentException(s"Cannot handle 
${agg.getRelTypeName}!")
     }
     require(outputIdx >= fullGrouping.length)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
index 34f0ade..4b80a55 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
@@ -498,7 +498,7 @@ class FlinkRelMdColumnInterval private extends 
MetadataHandler[ColumnInterval] {
    * @return interval of the given column on batch window Aggregate
    */
   def getColumnInterval(
-      agg: BatchExecWindowAggregateBase,
+      agg: BatchPhysicalWindowAggregateBase,
       mq: RelMetadataQuery,
       index: Int): ValueInterval = estimateColumnIntervalOfAggregate(agg, mq, 
index)
 
@@ -544,11 +544,11 @@ class FlinkRelMdColumnInterval private extends 
MetadataHandler[ColumnInterval] {
       case agg: Aggregate => AggregateUtil.checkAndGetFullGroupSet(agg)
       case agg: BatchExecLocalSortWindowAggregate =>
         // grouping + assignTs + auxGrouping
-        agg.getGrouping ++ Array(agg.inputTimeFieldIndex) ++ agg.getAuxGrouping
+        agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping
       case agg: BatchExecLocalHashWindowAggregate =>
         // grouping + assignTs + auxGrouping
-        agg.getGrouping ++ Array(agg.inputTimeFieldIndex) ++ agg.getAuxGrouping
-      case agg: BatchExecWindowAggregateBase => agg.getGrouping ++ 
agg.getAuxGrouping
+        agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping
+      case agg: BatchPhysicalWindowAggregateBase => agg.grouping ++ 
agg.auxGrouping
       case agg: TableAggregate => agg.getGroupSet.toArray
       case agg: StreamPhysicalGroupTableAggregate => agg.grouping
       case agg: StreamPhysicalGroupWindowTableAggregate => agg.grouping
@@ -642,7 +642,7 @@ class FlinkRelMdColumnInterval private extends 
MetadataHandler[ColumnInterval] {
             } else {
               null
             }
-          case agg: BatchExecWindowAggregateBase if agg.getAggCallList.length 
> aggCallIndex =>
+          case agg: BatchPhysicalWindowAggregateBase if 
agg.getAggCallList.length > aggCallIndex =>
             agg.getAggCallList(aggCallIndex)
           case _ => null
         }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
index 5d42e11..c9ec8d8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
@@ -387,14 +387,14 @@ class FlinkRelMdColumnUniqueness private extends 
MetadataHandler[BuiltInMetadata
   }
 
   def areColumnsUnique(
-      rel: BatchExecWindowAggregateBase,
+      rel: BatchPhysicalWindowAggregateBase,
       mq: RelMetadataQuery,
       columns: ImmutableBitSet,
       ignoreNulls: Boolean): JBoolean = {
     if (rel.isFinal) {
       areColumnsUniqueOnWindowAggregate(
-        rel.getGrouping,
-        rel.getNamedProperties,
+        rel.grouping,
+        rel.namedWindowProperties,
         rel.getRowType.getFieldCount,
         mq,
         columns,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCount.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCount.scala
index ee52362..b727c73 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCount.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCount.scala
@@ -399,7 +399,7 @@ class FlinkRelMdDistinctRowCount private extends 
MetadataHandler[BuiltInMetadata
       FlinkRelMdUtil.splitPredicateOnAggregate(rel, predicate)
     case rel: BatchPhysicalGroupAggregateBase =>
       FlinkRelMdUtil.splitPredicateOnAggregate(rel, predicate)
-    case rel: BatchExecWindowAggregateBase =>
+    case rel: BatchPhysicalWindowAggregateBase =>
       FlinkRelMdUtil.splitPredicateOnAggregate(rel, predicate)
   }
 
@@ -427,7 +427,7 @@ class FlinkRelMdDistinctRowCount private extends 
MetadataHandler[BuiltInMetadata
   }
 
   def getDistinctRowCount(
-      rel: BatchExecWindowAggregateBase,
+      rel: BatchPhysicalWindowAggregateBase,
       mq: RelMetadataQuery,
       groupKey: ImmutableBitSet,
       predicate: RexNode): JDouble = {
@@ -438,7 +438,7 @@ class FlinkRelMdDistinctRowCount private extends 
MetadataHandler[BuiltInMetadata
     }
 
     val newPredicate = if (rel.isFinal) {
-      val namedWindowStartIndex = rel.getRowType.getFieldCount - 
rel.getNamedProperties.size
+      val namedWindowStartIndex = rel.getRowType.getFieldCount - 
rel.namedWindowProperties.size
       val groupKeyFromNamedWindow = groupKey.toList.exists(_ >= 
namedWindowStartIndex)
       if (groupKeyFromNamedWindow) {
         // cannot estimate DistinctRowCount result when some group keys are 
from named windows
@@ -455,7 +455,7 @@ class FlinkRelMdDistinctRowCount private extends 
MetadataHandler[BuiltInMetadata
       }
     } else {
       // local window aggregate
-      val assignTsFieldIndex = rel.getGrouping.length
+      val assignTsFieldIndex = rel.grouping.length
       if (groupKey.toList.contains(assignTsFieldIndex)) {
         // groupKey contains `assignTs` fields
         return null
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSize.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSize.scala
index ac37076..1742c70 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSize.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSize.scala
@@ -290,11 +290,11 @@ class FlinkRelMdPopulationSize private extends 
MetadataHandler[BuiltInMetadata.P
   }
 
   def getPopulationSize(
-      rel: BatchExecWindowAggregateBase,
+      rel: BatchPhysicalWindowAggregateBase,
       mq: RelMetadataQuery,
       groupKey: ImmutableBitSet): JDouble = {
     if (rel.isFinal) {
-      val namedWindowStartIndex = rel.getRowType.getFieldCount - 
rel.getNamedProperties.size
+      val namedWindowStartIndex = rel.getRowType.getFieldCount - 
rel.namedWindowProperties.size
       val groupKeyFromNamedWindow = groupKey.toList.exists(_ >= 
namedWindowStartIndex)
       if (groupKeyFromNamedWindow) {
         return null
@@ -306,7 +306,7 @@ class FlinkRelMdPopulationSize private extends 
MetadataHandler[BuiltInMetadata.P
       }
     } else {
       // local window aggregate
-      val assignTsFieldIndex = rel.getGrouping.length
+      val assignTsFieldIndex = rel.grouping.length
       if (groupKey.toList.contains(assignTsFieldIndex)) {
         // groupKey contains `assignTs` fields
         return null
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala
index 32328c5..2d48db8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala
@@ -147,8 +147,8 @@ class FlinkRelMdRowCount private extends 
MetadataHandler[BuiltInMetadata.RowCoun
     val (grouping, isFinal, isMerge) = rel match {
       case agg: BatchPhysicalGroupAggregateBase =>
         (ImmutableBitSet.of(agg.grouping: _*), agg.isFinal, agg.isMerge)
-      case windowAgg: BatchExecWindowAggregateBase =>
-        (ImmutableBitSet.of(windowAgg.getGrouping: _*), windowAgg.isFinal, 
windowAgg.isMerge)
+      case windowAgg: BatchPhysicalWindowAggregateBase =>
+        (ImmutableBitSet.of(windowAgg.grouping: _*), windowAgg.isFinal, 
windowAgg.isMerge)
       case _ => throw new IllegalArgumentException(s"Unknown aggregate type 
${rel.getRelTypeName}!")
     }
     val ndvOfGroupKeysOnGlobalAgg: JDouble = if (grouping.isEmpty) {
@@ -199,10 +199,10 @@ class FlinkRelMdRowCount private extends 
MetadataHandler[BuiltInMetadata.RowCoun
     estimateRowCountOfWindowAgg(ndvOfGroupKeys, inputRowCount, rel.getWindow)
   }
 
-  def getRowCount(rel: BatchExecWindowAggregateBase, mq: RelMetadataQuery): 
JDouble = {
+  def getRowCount(rel: BatchPhysicalWindowAggregateBase, mq: 
RelMetadataQuery): JDouble = {
     val ndvOfGroupKeys = getRowCountOfBatchExecAgg(rel, mq)
     val inputRowCount = mq.getRowCount(rel.getInput)
-    estimateRowCountOfWindowAgg(ndvOfGroupKeys, inputRowCount, rel.getWindow)
+    estimateRowCountOfWindowAgg(ndvOfGroupKeys, inputRowCount, rel.window)
   }
 
   private def estimateRowCountOfWindowAgg(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivity.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivity.scala
index bc30bd0..9143aca 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivity.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivity.scala
@@ -110,7 +110,7 @@ class FlinkRelMdSelectivity private extends 
MetadataHandler[BuiltInMetadata.Sele
   }
 
   def getSelectivity(
-      rel: BatchExecWindowAggregateBase,
+      rel: BatchPhysicalWindowAggregateBase,
       mq: RelMetadataQuery,
       predicate: RexNode): JDouble = {
     val newPredicate = if (rel.isFinal) {
@@ -131,12 +131,12 @@ class FlinkRelMdSelectivity private extends 
MetadataHandler[BuiltInMetadata.Sele
       val hasLocalAgg = agg match {
         case _: Aggregate => false
         case rel: BatchPhysicalGroupAggregateBase => rel.isFinal && rel.isMerge
-        case rel: BatchExecWindowAggregateBase => rel.isFinal && rel.isMerge
+        case rel: BatchPhysicalWindowAggregateBase => rel.isFinal && 
rel.isMerge
         case _ => throw new IllegalArgumentException(s"Cannot handle 
${agg.getRelTypeName}!")
       }
       if (hasLocalAgg) {
         val childPredicate = agg match {
-          case rel: BatchExecWindowAggregateBase =>
+          case rel: BatchPhysicalWindowAggregateBase =>
             // set the predicate as they correspond to local window aggregate
             FlinkRelMdUtil.setChildPredicateOfWinAgg(predicate, rel)
           case _ => predicate
@@ -149,7 +149,7 @@ class FlinkRelMdSelectivity private extends 
MetadataHandler[BuiltInMetadata.Sele
           FlinkRelMdUtil.splitPredicateOnAggregate(rel, predicate)
         case rel: BatchPhysicalGroupAggregateBase =>
           FlinkRelMdUtil.splitPredicateOnAggregate(rel, predicate)
-        case rel: BatchExecWindowAggregateBase =>
+        case rel: BatchPhysicalWindowAggregateBase =>
           FlinkRelMdUtil.splitPredicateOnAggregate(rel, predicate)
         case _ => throw new IllegalArgumentException(s"Cannot handle 
${agg.getRelTypeName}!")
       }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala
index 024497f..e408203 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala
@@ -202,7 +202,7 @@ class FlinkRelMdSize private extends 
MetadataHandler[BuiltInMetadata.Size] {
   }
 
   def averageColumnSizes(
-      rel: BatchExecWindowAggregateBase,
+      rel: BatchPhysicalWindowAggregateBase,
       mq: RelMetadataQuery): JList[JDouble] = {
     averageColumnSizesOfWindowAgg(rel, mq)
   }
@@ -215,18 +215,18 @@ class FlinkRelMdSize private extends 
MetadataHandler[BuiltInMetadata.Size] {
         AggregateUtil.checkAndGetFullGroupSet(agg).zipWithIndex.toMap
       case agg: BatchExecLocalHashWindowAggregate =>
         // local win-agg output type: grouping + assignTs + auxGrouping + 
aggCalls
-        agg.getGrouping.zipWithIndex.toMap ++
-          agg.getAuxGrouping.zipWithIndex.map {
-            case (k, v) => k -> (agg.getGrouping.length + 1 + v)
+        agg.grouping.zipWithIndex.toMap ++
+          agg.auxGrouping.zipWithIndex.map {
+            case (k, v) => k -> (agg.grouping.length + 1 + v)
           }.toMap
       case agg: BatchExecLocalSortWindowAggregate =>
         // local win-agg output type: grouping + assignTs + auxGrouping + 
aggCalls
-        agg.getGrouping.zipWithIndex.toMap ++
-          agg.getAuxGrouping.zipWithIndex.map {
-            case (k, v) => k -> (agg.getGrouping.length + 1 + v)
+        agg.grouping.zipWithIndex.toMap ++
+          agg.auxGrouping.zipWithIndex.map {
+            case (k, v) => k -> (agg.grouping.length + 1 + v)
           }.toMap
-      case agg: BatchExecWindowAggregateBase =>
-        (agg.getGrouping ++ agg.getAuxGrouping).zipWithIndex.toMap
+      case agg: BatchPhysicalWindowAggregateBase =>
+        (agg.grouping ++ agg.auxGrouping).zipWithIndex.toMap
       case _ => throw new IllegalArgumentException(s"Unknown node type 
${windowAgg.getRelTypeName}")
     }
     getColumnSizesFromInputOrType(windowAgg, mq, mapInputToOutput)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueGroups.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueGroups.scala
index 8733d17..3a4934e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueGroups.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueGroups.scala
@@ -264,12 +264,11 @@ class FlinkRelMdUniqueGroups private extends 
MetadataHandler[UniqueGroups] {
   }
 
   def getUniqueGroups(
-      agg: BatchExecWindowAggregateBase,
+      agg: BatchPhysicalWindowAggregateBase,
       mq: RelMetadataQuery,
       columns: ImmutableBitSet): ImmutableBitSet = {
-    val grouping = agg.getGrouping
-    val namedProperties = agg.getNamedProperties
-    getUniqueGroupsOfWindowAgg(agg, grouping, agg.getAuxGrouping, 
namedProperties, mq, columns)
+    getUniqueGroupsOfWindowAgg(
+      agg, agg.grouping, agg.auxGrouping, agg.namedWindowProperties, mq, 
columns)
   }
 
   private def getUniqueGroupsOfWindowAgg(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
index 9a9d544..312ce30 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
@@ -384,14 +384,14 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
   }
 
   def getUniqueKeys(
-      rel: BatchExecWindowAggregateBase,
+      rel: BatchPhysicalWindowAggregateBase,
       mq: RelMetadataQuery,
       ignoreNulls: Boolean): util.Set[ImmutableBitSet] = {
     if (rel.isFinal) {
       getUniqueKeysOnWindowAgg(
         rel.getRowType.getFieldCount,
-        rel.getNamedProperties,
-        rel.getGrouping,
+        rel.namedWindowProperties,
+        rel.grouping,
         mq,
         ignoreNulls)
     } else {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala
index b8b4ecb..f3ce9fe 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala
@@ -27,7 +27,6 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.tools.RelBuilder
 
 import java.util
 
@@ -35,11 +34,9 @@ import scala.collection.JavaConversions._
 
 class BatchExecHashWindowAggregate(
     cluster: RelOptCluster,
-    relBuilder: RelBuilder,
     traitSet: RelTraitSet,
     inputRel: RelNode,
     outputRowType: RelDataType,
-    inputRowType: RelDataType,
     aggInputRowType: RelDataType,
     grouping: Array[Int],
     auxGrouping: Array[Int],
@@ -47,16 +44,14 @@ class BatchExecHashWindowAggregate(
     window: LogicalWindow,
     inputTimeFieldIndex: Int,
     inputTimeIsDate: Boolean,
-    namedProperties: Seq[PlannerNamedWindowProperty],
+    namedWindowProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = false,
     isMerge: Boolean)
   extends BatchExecHashWindowAggregateBase(
     cluster,
-    relBuilder,
     traitSet,
     inputRel,
     outputRowType,
-    inputRowType,
     aggInputRowType,
     grouping,
     auxGrouping,
@@ -64,7 +59,7 @@ class BatchExecHashWindowAggregate(
     window,
     inputTimeFieldIndex,
     inputTimeIsDate,
-    namedProperties,
+    namedWindowProperties,
     enableAssignPane,
     isMerge,
     isFinal = true) {
@@ -72,11 +67,9 @@ class BatchExecHashWindowAggregate(
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
     new BatchExecHashWindowAggregate(
       cluster,
-      relBuilder,
       traitSet,
       inputs.get(0),
       getRowType,
-      inputRowType,
       aggInputRowType,
       grouping,
       auxGrouping,
@@ -84,7 +77,7 @@ class BatchExecHashWindowAggregate(
       window,
       inputTimeFieldIndex,
       inputTimeIsDate,
-      namedProperties,
+      namedWindowProperties,
       enableAssignPane,
       isMerge)
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
index 1fcd892..92bfeca 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
@@ -46,11 +46,9 @@ import 
org.apache.flink.table.runtime.util.collections.binary.BytesMap
 
 abstract class BatchExecHashWindowAggregateBase(
     cluster: RelOptCluster,
-    relBuilder: RelBuilder,
     traitSet: RelTraitSet,
     inputRel: RelNode,
     outputRowType: RelDataType,
-    inputRowType: RelDataType,
     aggInputRowType: RelDataType,
     grouping: Array[Int],
     auxGrouping: Array[Int],
@@ -58,21 +56,20 @@ abstract class BatchExecHashWindowAggregateBase(
     window: LogicalWindow,
     inputTimeFieldIndex: Int,
     inputTimeIsDate: Boolean,
-    namedProperties: Seq[PlannerNamedWindowProperty],
+    namedWindowProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = false,
     isMerge: Boolean,
     isFinal: Boolean)
-  extends BatchExecWindowAggregateBase(
+  extends BatchPhysicalWindowAggregateBase(
     cluster,
     traitSet,
     inputRel,
     outputRowType,
-    inputRowType,
     grouping,
     auxGrouping,
     aggCallToAggFunction,
     window,
-    namedProperties,
+    namedWindowProperties,
     enableAssignPane,
     isMerge,
     isFinal)
@@ -109,6 +106,7 @@ abstract class BatchExecHashWindowAggregateBase(
         .asInstanceOf[Transformation[RowData]]
     val ctx = CodeGeneratorContext(config)
     val outputType = FlinkTypeFactory.toLogicalRowType(getRowType)
+    val inputRowType = getInput.getRowType
     val inputType = FlinkTypeFactory.toLogicalRowType(inputRowType)
 
     val aggInfos = transformToBatchAggregateInfoList(
@@ -120,8 +118,8 @@ abstract class BatchExecHashWindowAggregateBase(
     val (windowSize: Long, slideSize: Long) = 
WindowCodeGenerator.getWindowDef(window)
 
     val generatedOperator = new HashWindowCodeGenerator(
-      ctx, relBuilder, window, inputTimeFieldIndex,
-      inputTimeIsDate, namedProperties,
+      ctx, planner.getRelBuilder, window, inputTimeFieldIndex,
+      inputTimeIsDate, namedWindowProperties,
       aggInfos, inputRowType, grouping, auxGrouping, enableAssignPane, 
isMerge, isFinal).gen(
       inputType, outputType, groupBufferLimitSize, 0,
       windowSize, slideSize)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala
index c15d323..2e19199 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala
@@ -27,7 +27,6 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.tools.RelBuilder
 
 import java.util
 
@@ -35,7 +34,6 @@ import scala.collection.JavaConversions._
 
 class BatchExecLocalHashWindowAggregate(
     cluster: RelOptCluster,
-    relBuilder: RelBuilder,
     traitSet: RelTraitSet,
     inputRel: RelNode,
     outputRowType: RelDataType,
@@ -46,23 +44,21 @@ class BatchExecLocalHashWindowAggregate(
     window: LogicalWindow,
     val inputTimeFieldIndex: Int,
     inputTimeIsDate: Boolean,
-    namedProperties: Seq[PlannerNamedWindowProperty],
+    namedWindowProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = false)
   extends BatchExecHashWindowAggregateBase(
     cluster,
-    relBuilder,
     traitSet,
     inputRel,
     outputRowType,
     inputRowType,
-    inputRowType,
     grouping,
     auxGrouping,
     aggCallToAggFunction,
     window,
     inputTimeFieldIndex,
     inputTimeIsDate,
-    namedProperties,
+    namedWindowProperties,
     enableAssignPane,
     isMerge = false,
     isFinal = false) {
@@ -70,7 +66,6 @@ class BatchExecLocalHashWindowAggregate(
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
     new BatchExecLocalHashWindowAggregate(
       cluster,
-      relBuilder,
       traitSet,
       inputs.get(0),
       getRowType,
@@ -81,7 +76,7 @@ class BatchExecLocalHashWindowAggregate(
       window,
       inputTimeFieldIndex,
       inputTimeIsDate,
-      namedProperties,
+      namedWindowProperties,
       enableAssignPane)
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
index e9a3e89..28a2ea3 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
@@ -27,7 +27,6 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.tools.RelBuilder
 
 import java.util
 
@@ -35,7 +34,6 @@ import scala.collection.JavaConversions._
 
 class BatchExecLocalSortWindowAggregate(
     cluster: RelOptCluster,
-    relBuilder: RelBuilder,
     traitSet: RelTraitSet,
     inputRel: RelNode,
     outputRowType: RelDataType,
@@ -46,23 +44,21 @@ class BatchExecLocalSortWindowAggregate(
     window: LogicalWindow,
     val inputTimeFieldIndex: Int,
     inputTimeIsDate: Boolean,
-    namedProperties: Seq[PlannerNamedWindowProperty],
+    namedWindowProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = false)
   extends BatchExecSortWindowAggregateBase(
     cluster,
-    relBuilder,
     traitSet,
     inputRel,
     outputRowType,
     inputRowType,
-    inputRowType,
     grouping,
     auxGrouping,
     aggCallToAggFunction,
     window,
     inputTimeFieldIndex,
     inputTimeIsDate,
-    namedProperties,
+    namedWindowProperties,
     enableAssignPane,
     isMerge = false,
     isFinal = false) {
@@ -70,7 +66,6 @@ class BatchExecLocalSortWindowAggregate(
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
     new BatchExecLocalSortWindowAggregate(
       cluster,
-      relBuilder,
       traitSet,
       inputs.get(0),
       outputRowType,
@@ -81,7 +76,7 @@ class BatchExecLocalSortWindowAggregate(
       window,
       inputTimeFieldIndex,
       inputTimeIsDate,
-      namedProperties,
+      namedWindowProperties,
       enableAssignPane)
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala
index ab3ce2c..ba84b24 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala
@@ -45,7 +45,6 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.tools.RelBuilder
 
 import java.util
 
@@ -56,7 +55,6 @@ import scala.collection.JavaConversions._
   */
 class BatchExecPythonGroupWindowAggregate(
     cluster: RelOptCluster,
-    relBuilder: RelBuilder,
     traitSet: RelTraitSet,
     inputRel: RelNode,
     outputRowType: RelDataType,
@@ -68,18 +66,17 @@ class BatchExecPythonGroupWindowAggregate(
     window: LogicalWindow,
     inputTimeFieldIndex: Int,
     inputTimeIsDate: Boolean,
-    namedProperties: Seq[PlannerNamedWindowProperty])
-  extends BatchExecWindowAggregateBase(
+    namedWindowProperties: Seq[PlannerNamedWindowProperty])
+  extends BatchPhysicalWindowAggregateBase(
     cluster,
     traitSet,
     inputRel,
     outputRowType,
-    inputRowType,
     grouping,
     auxGrouping,
     aggCalls.zip(aggFunctions),
     window,
-    namedProperties,
+    namedWindowProperties,
     false,
     false,
     true)
@@ -89,7 +86,6 @@ class BatchExecPythonGroupWindowAggregate(
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
     new BatchExecPythonGroupWindowAggregate(
       cluster,
-      relBuilder,
       traitSet,
       inputs.get(0),
       outputRowType,
@@ -101,7 +97,7 @@ class BatchExecPythonGroupWindowAggregate(
       window,
       inputTimeFieldIndex,
       inputTimeIsDate,
-      namedProperties)
+      namedWindowProperties)
   }
 
   override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
@@ -160,7 +156,7 @@ class BatchExecPythonGroupWindowAggregate(
       windowSize: Long,
       slideSize: Long,
       config: Configuration): OneInputTransformation[RowData, RowData] = {
-    val namePropertyTypeArray = namedProperties.map {
+    val namePropertyTypeArray = namedWindowProperties.map {
       case PlannerNamedWindowProperty(_, p) => p match {
         case PlannerWindowStart(_) => 0
         case PlannerWindowEnd(_) => 1
@@ -199,7 +195,7 @@ class BatchExecPythonGroupWindowAggregate(
       maxLimitSize: Int,
       windowSize: Long,
       slideSize: Long,
-      namedProperties: Array[Int],
+      namedWindowProperties: Array[Int],
       udafInputOffsets: Array[Int],
       pythonFunctionInfos: Array[PythonFunctionInfo]): 
OneInputStreamOperator[RowData, RowData] = {
     val clazz = 
loadClass(ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME)
@@ -227,7 +223,7 @@ class BatchExecPythonGroupWindowAggregate(
       Integer.valueOf(maxLimitSize),
       java.lang.Long.valueOf(windowSize),
       java.lang.Long.valueOf(slideSize),
-      namedProperties,
+      namedWindowProperties,
       grouping,
       grouping ++ auxGrouping,
       udafInputOffsets)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
index 9eb0f97..d0552b7 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
@@ -27,7 +27,6 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.tools.RelBuilder
 
 import java.util
 
@@ -35,11 +34,9 @@ import scala.collection.JavaConversions._
 
 class BatchExecSortWindowAggregate(
     cluster: RelOptCluster,
-    relBuilder: RelBuilder,
     traitSet: RelTraitSet,
     inputRel: RelNode,
     outputRowType: RelDataType,
-    inputRowType: RelDataType,
     aggInputRowType: RelDataType,
     grouping: Array[Int],
     auxGrouping: Array[Int],
@@ -47,16 +44,14 @@ class BatchExecSortWindowAggregate(
     window: LogicalWindow,
     inputTimeFieldIndex: Int,
     inputTimeIsDate: Boolean,
-    namedProperties: Seq[PlannerNamedWindowProperty],
+    namedWindowProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = false,
     isMerge: Boolean)
   extends BatchExecSortWindowAggregateBase(
     cluster,
-    relBuilder,
     traitSet,
     inputRel,
     outputRowType,
-    inputRowType,
     aggInputRowType,
     grouping,
     auxGrouping,
@@ -64,7 +59,7 @@ class BatchExecSortWindowAggregate(
     window,
     inputTimeFieldIndex,
     inputTimeIsDate,
-    namedProperties,
+    namedWindowProperties,
     enableAssignPane,
     isMerge,
     isFinal = true) {
@@ -72,11 +67,9 @@ class BatchExecSortWindowAggregate(
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
     new BatchExecSortWindowAggregate(
       cluster,
-      relBuilder,
       traitSet,
       inputs.get(0),
       getRowType,
-      inputRowType,
       aggInputRowType,
       grouping,
       auxGrouping,
@@ -84,7 +77,7 @@ class BatchExecSortWindowAggregate(
       window,
       inputTimeFieldIndex,
       inputTimeIsDate,
-      namedProperties,
+      namedWindowProperties,
       enableAssignPane,
       isMerge)
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
index 64c1b0e..5ad4827 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
@@ -40,15 +40,12 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.tools.RelBuilder
 
 abstract class BatchExecSortWindowAggregateBase(
     cluster: RelOptCluster,
-    relBuilder: RelBuilder,
     traitSet: RelTraitSet,
     inputRel: RelNode,
     outputRowType: RelDataType,
-    inputRowType: RelDataType,
     aggInputRowType: RelDataType,
     grouping: Array[Int],
     auxGrouping: Array[Int],
@@ -56,21 +53,20 @@ abstract class BatchExecSortWindowAggregateBase(
     window: LogicalWindow,
     inputTimeFieldIndex: Int,
     inputTimeIsDate: Boolean,
-    namedProperties: Seq[PlannerNamedWindowProperty],
+    namedWindowProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = false,
     isMerge: Boolean,
     isFinal: Boolean)
-  extends BatchExecWindowAggregateBase(
+  extends BatchPhysicalWindowAggregateBase(
     cluster,
     traitSet,
     inputRel,
     outputRowType,
-    inputRowType,
     grouping,
     auxGrouping,
     aggCallToAggFunction,
     window,
-    namedProperties,
+    namedWindowProperties,
     enableAssignPane,
     isMerge,
     isFinal)
@@ -98,6 +94,7 @@ abstract class BatchExecSortWindowAggregateBase(
         .asInstanceOf[Transformation[RowData]]
     val ctx = CodeGeneratorContext(planner.getTableConfig)
     val outputType = FlinkTypeFactory.toLogicalRowType(getRowType)
+    val inputRowType = getInput().getRowType
     val inputType = FlinkTypeFactory.toLogicalRowType(inputRowType)
 
     val aggInfos = transformToBatchAggregateInfoList(
@@ -109,8 +106,8 @@ abstract class BatchExecSortWindowAggregateBase(
     val (windowSize: Long, slideSize: Long) = 
WindowCodeGenerator.getWindowDef(window)
 
     val generator = new SortWindowCodeGenerator(
-      ctx, relBuilder, window, inputTimeFieldIndex,
-      inputTimeIsDate, namedProperties,
+      ctx, planner.getRelBuilder, window, inputTimeFieldIndex,
+      inputTimeIsDate, namedWindowProperties,
       aggInfos, inputRowType, inputType, outputType,
       groupBufferLimitSize, 0L, windowSize, slideSize,
       grouping, auxGrouping, enableAssignPane, isMerge, isFinal)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecWindowAggregateBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalWindowAggregateBase.scala
similarity index 83%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecWindowAggregateBase.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalWindowAggregateBase.scala
index 63ba6af..d87002b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecWindowAggregateBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalWindowAggregateBase.scala
@@ -29,17 +29,19 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 
-abstract class BatchExecWindowAggregateBase(
+/**
+ * Base batch group window aggregate physical node.
+ */
+abstract class BatchPhysicalWindowAggregateBase(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputRel: RelNode,
     outputRowType: RelDataType,
-    inputRowType: RelDataType,
-    grouping: Array[Int],
-    auxGrouping: Array[Int],
+    val grouping: Array[Int],
+    val auxGrouping: Array[Int],
     aggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)],
-    window: LogicalWindow,
-    namedProperties: Seq[PlannerNamedWindowProperty],
+    val window: LogicalWindow,
+    val namedWindowProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = true,
     val isMerge: Boolean,
     val isFinal: Boolean)
@@ -50,25 +52,19 @@ abstract class BatchExecWindowAggregateBase(
     throw new TableException("auxGrouping should be empty if grouping is 
empty.")
   }
 
-  def getGrouping: Array[Int] = grouping
-
-  def getAuxGrouping: Array[Int] = auxGrouping
-
-  def getWindow: LogicalWindow = window
-
-  def getNamedProperties: Seq[PlannerNamedWindowProperty] = namedProperties
-
   def getAggCallList: Seq[AggregateCall] = aggCallToAggFunction.map(_._1)
 
   override def deriveRowType(): RelDataType = outputRowType
 
   override def explainTerms(pw: RelWriter): RelWriter = {
+    val inputRowType = getInput.getRowType
     super.explainTerms(pw)
       .itemIf("groupBy", RelExplainUtil.fieldToString(grouping, inputRowType), 
grouping.nonEmpty)
       .itemIf("auxGrouping", RelExplainUtil.fieldToString(auxGrouping, 
inputRowType),
         auxGrouping.nonEmpty)
       .item("window", window)
-      .itemIf("properties", namedProperties.map(_.name).mkString(", "), 
namedProperties.nonEmpty)
+      .itemIf("properties",
+        namedWindowProperties.map(_.name).mkString(", "), 
namedWindowProperties.nonEmpty)
       .item("select", RelExplainUtil.windowAggregationToString(
         inputRowType,
         grouping,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala
index 1c7b2bb..bbd96ac 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala
@@ -182,7 +182,6 @@ class BatchExecWindowAggregateRule
 
         new BatchExecLocalHashWindowAggregate(
           agg.getCluster,
-          call.builder(),
           localProvidedTraitSet,
           newLocalInput,
           localAggRelType,
@@ -205,7 +204,6 @@ class BatchExecWindowAggregateRule
 
         new BatchExecLocalSortWindowAggregate(
           agg.getCluster,
-          call.builder(),
           localProvidedTraitSet,
           newLocalInput,
           localAggRelType,
@@ -238,11 +236,9 @@ class BatchExecWindowAggregateRule
 
         new BatchExecHashWindowAggregate(
           agg.getCluster,
-          call.builder(),
           aggProvidedTraitSet,
           newGlobalAggInput,
           agg.getRowType,
-          newGlobalAggInput.getRowType,
           input.getRowType,
           groupSet.indices.toArray,
           // auxGroupSet starts from `size of groupSet + 1(assignTs)`
@@ -263,11 +259,9 @@ class BatchExecWindowAggregateRule
 
         new BatchExecSortWindowAggregate(
           agg.getCluster,
-          call.builder(),
           aggProvidedTraitSet,
           newGlobalAggInput,
           agg.getRowType,
-          newGlobalAggInput.getRowType,
           input.getRowType,
           groupSet.indices.toArray,
           // auxGroupSet starts from `size of groupSet + 1(assignTs)`
@@ -301,12 +295,10 @@ class BatchExecWindowAggregateRule
 
         new BatchExecHashWindowAggregate(
           agg.getCluster,
-          call.builder(),
           aggProvidedTraitSet,
           newInput,
           agg.getRowType,
           newInput.getRowType,
-          newInput.getRowType,
           groupSet,
           auxGroupSet,
           aggCallToAggFunction,
@@ -324,12 +316,10 @@ class BatchExecWindowAggregateRule
 
         new BatchExecSortWindowAggregate(
           agg.getCluster,
-          call.builder(),
           aggProvidedTraitSet,
           newInput,
           agg.getRowType,
           newInput.getRowType,
-          newInput.getRowType,
           groupSet,
           auxGroupSet,
           aggCallToAggFunction,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala
index 321abcd..2afbf0c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.planner.JDouble
 import 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, Rank, 
WindowAggregate}
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalHashWindowAggregate,
 BatchExecLocalSortWindowAggregate, BatchExecWindowAggregateBase, 
BatchPhysicalGroupAggregateBase}
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalHashWindowAggregate,
 BatchExecLocalSortWindowAggregate, BatchPhysicalGroupAggregateBase, 
BatchPhysicalWindowAggregateBase}
 import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, 
RankRange}
 import org.apache.flink.table.runtime.operators.sort.BinaryIndexedSortable
 import 
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.LENGTH_SIZE_IN_BYTES
@@ -174,12 +174,12 @@ object FlinkRelMdUtil {
    *         a predicate that stores NamedProperties predicate's selectivity
    */
   def makeNamePropertiesSelectivityRexNode(
-      globalWinAgg: BatchExecWindowAggregateBase,
+      globalWinAgg: BatchPhysicalWindowAggregateBase,
       predicate: RexNode): RexNode = {
     require(globalWinAgg.isFinal, "local window agg does not contain 
NamedProperties!")
-    val fullGrouping = globalWinAgg.getGrouping ++ globalWinAgg.getAuxGrouping
+    val fullGrouping = globalWinAgg.grouping ++ globalWinAgg.auxGrouping
     makeNamePropertiesSelectivityRexNode(
-      globalWinAgg, fullGrouping, globalWinAgg.getNamedProperties, predicate)
+      globalWinAgg, fullGrouping, globalWinAgg.namedWindowProperties, 
predicate)
   }
 
   /**
@@ -321,7 +321,7 @@ object FlinkRelMdUtil {
    */
   def setAggChildKeys(
       groupKey: ImmutableBitSet,
-      aggRel: BatchExecWindowAggregateBase): (ImmutableBitSet, 
Array[AggregateCall]) = {
+      aggRel: BatchPhysicalWindowAggregateBase): (ImmutableBitSet, 
Array[AggregateCall]) = {
     require(!aggRel.isFinal || !aggRel.isMerge, "Cannot handle global agg 
which has local agg!")
     setChildKeysOfAgg(groupKey, aggRel)
   }
@@ -333,13 +333,13 @@ object FlinkRelMdUtil {
       case agg: BatchExecLocalSortWindowAggregate =>
         // grouping + assignTs + auxGrouping
         (agg.getAggCallList,
-          agg.getGrouping ++ Array(agg.inputTimeFieldIndex) ++ 
agg.getAuxGrouping)
+          agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping)
       case agg: BatchExecLocalHashWindowAggregate =>
         // grouping + assignTs + auxGrouping
         (agg.getAggCallList,
-          agg.getGrouping ++ Array(agg.inputTimeFieldIndex) ++ 
agg.getAuxGrouping)
-      case agg: BatchExecWindowAggregateBase =>
-        (agg.getAggCallList, agg.getGrouping ++ agg.getAuxGrouping)
+          agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping)
+      case agg: BatchPhysicalWindowAggregateBase =>
+        (agg.getAggCallList, agg.grouping ++ agg.auxGrouping)
       case agg: BatchPhysicalGroupAggregateBase =>
         (agg.getAggCallList, agg.grouping ++ agg.auxGrouping)
       case _ => throw new IllegalArgumentException(s"Unknown aggregate: 
${agg.getRelTypeName}")
@@ -373,11 +373,11 @@ object FlinkRelMdUtil {
    */
   def setChildKeysOfWinAgg(
       groupKey: ImmutableBitSet,
-      globalWinAgg: BatchExecWindowAggregateBase): ImmutableBitSet = {
+      globalWinAgg: BatchPhysicalWindowAggregateBase): ImmutableBitSet = {
     require(globalWinAgg.isMerge, "Cannot handle global agg which does not 
have local window agg!")
     val childKeyBuilder = ImmutableBitSet.builder
     groupKey.toArray.foreach { key =>
-      if (key < globalWinAgg.getGrouping.length) {
+      if (key < globalWinAgg.grouping.length) {
         childKeyBuilder.set(key)
       } else {
         // skips `assignTs`
@@ -422,9 +422,9 @@ object FlinkRelMdUtil {
         val (childKeys, aggCalls) = setAggChildKeys(groupKey, rel)
         val childKeyExcludeAuxKey = removeAuxKey(childKeys, rel.grouping, 
rel.auxGrouping)
         (childKeyExcludeAuxKey, aggCalls)
-      case rel: BatchExecWindowAggregateBase =>
+      case rel: BatchPhysicalWindowAggregateBase =>
         val (childKeys, aggCalls) = setAggChildKeys(groupKey, rel)
-        val childKeyExcludeAuxKey = removeAuxKey(childKeys, rel.getGrouping, 
rel.getAuxGrouping)
+        val childKeyExcludeAuxKey = removeAuxKey(childKeys, rel.grouping, 
rel.auxGrouping)
         (childKeyExcludeAuxKey, aggCalls)
       case _ => throw new IllegalArgumentException(s"Unknown aggregate: 
${agg.getRelTypeName}.")
     }
@@ -471,9 +471,9 @@ object FlinkRelMdUtil {
    *         Note, pushable condition will be converted based on the input 
field position.
    */
   def splitPredicateOnAggregate(
-      agg: BatchExecWindowAggregateBase,
+      agg: BatchPhysicalWindowAggregateBase,
       predicate: RexNode): (Option[RexNode], Option[RexNode]) = {
-    splitPredicateOnAgg(agg.getGrouping ++ agg.getAuxGrouping, agg, predicate)
+    splitPredicateOnAgg(agg.grouping ++ agg.auxGrouping, agg, predicate)
   }
 
   /**
@@ -488,13 +488,13 @@ object FlinkRelMdUtil {
    */
   def setChildPredicateOfWinAgg(
       predicate: RexNode,
-      globalWinAgg: BatchExecWindowAggregateBase): RexNode = {
+      globalWinAgg: BatchPhysicalWindowAggregateBase): RexNode = {
     require(globalWinAgg.isMerge, "Cannot handle global agg which does not 
have local window agg!")
     if (predicate == null) {
       return null
     }
     // grouping + assignTs + auxGrouping
-    val fullGrouping = globalWinAgg.getGrouping ++ globalWinAgg.getAuxGrouping
+    val fullGrouping = globalWinAgg.grouping ++ globalWinAgg.auxGrouping
     // skips `assignTs`
     RexUtil.shift(predicate, fullGrouping.length, 1)
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index cd3e5c0..5a5de0c 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -1246,7 +1246,6 @@ class FlinkRelMdHandlerTestBase {
       localWindowAggTypes, localWindowAggNames)
     val batchLocalWindowAgg = new BatchExecLocalHashWindowAggregate(
       batchCalc.getCluster,
-      relBuilder,
       batchPhysicalTraits,
       batchCalc,
       localWindowAggRowType,
@@ -1263,11 +1262,9 @@ class FlinkRelMdHandlerTestBase {
       cluster, batchPhysicalTraits.replace(hash01), batchLocalWindowAgg, 
hash01)
     val batchWindowAggWithLocal = new BatchExecHashWindowAggregate(
       cluster,
-      relBuilder,
       batchPhysicalTraits,
       batchExchange2,
       flinkLogicalWindowAgg.getRowType,
-      batchExchange2.getRowType,
       batchCalc.getRowType,
       Array(0, 1),
       Array.empty,
@@ -1282,12 +1279,10 @@ class FlinkRelMdHandlerTestBase {
 
     val batchWindowAggWithoutLocal = new BatchExecHashWindowAggregate(
       batchExchange1.getCluster,
-      relBuilder,
       batchPhysicalTraits,
       batchExchange1,
       flinkLogicalWindowAgg.getRowType,
       batchExchange1.getRowType,
-      batchExchange1.getRowType,
       Array(0, 1),
       Array.empty,
       aggCallToAggFunction,
@@ -1390,7 +1385,6 @@ class FlinkRelMdHandlerTestBase {
       localWindowAggTypes, localWindowAggNames)
     val batchLocalWindowAgg = new BatchExecLocalHashWindowAggregate(
       batchCalc.getCluster,
-      relBuilder,
       batchPhysicalTraits,
       batchCalc,
       localWindowAggRowType,
@@ -1407,11 +1401,9 @@ class FlinkRelMdHandlerTestBase {
       cluster, batchPhysicalTraits.replace(hash1), batchLocalWindowAgg, hash1)
     val batchWindowAggWithLocal = new BatchExecHashWindowAggregate(
       cluster,
-      relBuilder,
       batchPhysicalTraits,
       batchExchange2,
       flinkLogicalWindowAgg.getRowType,
-      batchExchange2.getRowType,
       batchCalc.getRowType,
       Array(0),
       Array.empty,
@@ -1426,12 +1418,10 @@ class FlinkRelMdHandlerTestBase {
 
     val batchWindowAggWithoutLocal = new BatchExecHashWindowAggregate(
       batchExchange1.getCluster,
-      relBuilder,
       batchPhysicalTraits,
       batchExchange1,
       flinkLogicalWindowAgg.getRowType,
       batchExchange1.getRowType,
-      batchExchange1.getRowType,
       Array(1),
       Array.empty,
       aggCallToAggFunction,
@@ -1539,7 +1529,6 @@ class FlinkRelMdHandlerTestBase {
       localWindowAggTypes, localWindowAggNames)
     val batchLocalWindowAggWithAuxGroup = new 
BatchExecLocalHashWindowAggregate(
       batchCalc.getCluster,
-      relBuilder,
       batchPhysicalTraits,
       batchCalc,
       localWindowAggRowType,
@@ -1556,11 +1545,9 @@ class FlinkRelMdHandlerTestBase {
       cluster, batchPhysicalTraits.replace(hash0), 
batchLocalWindowAggWithAuxGroup, hash0)
     val batchWindowAggWithLocalWithAuxGroup = new BatchExecHashWindowAggregate(
       cluster,
-      relBuilder,
       batchPhysicalTraits,
       batchExchange2,
       flinkLogicalWindowAggWithAuxGroup.getRowType,
-      batchExchange2.getRowType,
       batchCalc.getRowType,
       Array(0),
       Array(2), // local output grouping keys: grouping + assignTs + 
auxGrouping
@@ -1575,12 +1562,10 @@ class FlinkRelMdHandlerTestBase {
 
     val batchWindowAggWithoutLocalWithAuxGroup = new 
BatchExecHashWindowAggregate(
       batchExchange1.getCluster,
-      relBuilder,
       batchPhysicalTraits,
       batchExchange1,
       flinkLogicalWindowAggWithAuxGroup.getRowType,
       batchExchange1.getRowType,
-      batchExchange1.getRowType,
       Array(0),
       Array(1),
       aggCallToAggFunction,

Reply via email to