This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 36352a3a387e108c0087765d82e398b393111a7b
Author: godfreyhe <[email protected]>
AuthorDate: Thu Jan 7 15:30:54 2021 +0800

    [FLINK-20857][table-planner-blink] Introduce 
BatchPhysicalSortWindowAggregate & BatchPhysicalLocalSortWindowAggregate, and 
make BatchExecSortWindowAggregate only extended from ExecNode
    
    This closes #14574
---
 .../exec/batch/BatchExecSortWindowAggregate.java   | 150 +++++++++++++++++++++
 .../agg/batch/SortWindowCodeGenerator.scala        |  12 +-
 .../metadata/AggCallSelectivityEstimator.scala     |   4 +-
 .../plan/metadata/FlinkRelMdColumnInterval.scala   |   2 +-
 .../planner/plan/metadata/FlinkRelMdSize.scala     |   2 +-
 .../batch/BatchExecSortWindowAggregateBase.scala   | 128 ------------------
 ...=> BatchPhysicalLocalSortWindowAggregate.scala} |  37 +++--
 ...cala => BatchPhysicalSortWindowAggregate.scala} |  39 ++++--
 ... => BatchPhysicalSortWindowAggregateBase.scala} |  61 ++++-----
 .../batch/BatchPhysicalWindowAggregateRule.scala   |   8 +-
 .../table/planner/plan/utils/FlinkRelMdUtil.scala  |  10 +-
 11 files changed, 240 insertions(+), 213 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortWindowAggregate.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortWindowAggregate.java
new file mode 100644
index 0000000..ccf3109
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortWindowAggregate.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import 
org.apache.flink.table.planner.codegen.agg.batch.SortWindowCodeGenerator;
+import org.apache.flink.table.planner.codegen.agg.batch.WindowCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.generated.GeneratedOperator;
+import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.rel.core.AggregateCall;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/** Batch {@link ExecNode} for sort-based window aggregate operator. */
+public class BatchExecSortWindowAggregate extends ExecNodeBase<RowData>
+        implements BatchExecNode<RowData> {
+
+    private final int[] grouping;
+    private final int[] auxGrouping;
+    private final AggregateCall[] aggCalls;
+    private final LogicalWindow window;
+    private final int inputTimeFieldIndex;
+    private final boolean inputTimeIsDate;
+    private final PlannerNamedWindowProperty[] namedWindowProperties;
+    private final RowType aggInputRowType;
+    private final boolean enableAssignPane;
+    private final boolean isMerge;
+    private final boolean isFinal;
+
+    public BatchExecSortWindowAggregate(
+            int[] grouping,
+            int[] auxGrouping,
+            AggregateCall[] aggCalls,
+            LogicalWindow window,
+            int inputTimeFieldIndex,
+            boolean inputTimeIsDate,
+            PlannerNamedWindowProperty[] namedWindowProperties,
+            RowType aggInputRowType,
+            boolean enableAssignPane,
+            boolean isMerge,
+            boolean isFinal,
+            ExecEdge inputEdge,
+            RowType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdge), outputType, description);
+        this.grouping = grouping;
+        this.auxGrouping = auxGrouping;
+        this.aggCalls = aggCalls;
+        this.window = window;
+        this.inputTimeFieldIndex = inputTimeFieldIndex;
+        this.inputTimeIsDate = inputTimeIsDate;
+        this.namedWindowProperties = namedWindowProperties;
+        this.aggInputRowType = aggInputRowType;
+        this.enableAssignPane = enableAssignPane;
+        this.isMerge = isMerge;
+        this.isFinal = isFinal;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+        final ExecNode<RowData> inputNode = (ExecNode<RowData>) 
getInputNodes().get(0);
+        final Transformation<RowData> inputTransform = 
inputNode.translateToPlan(planner);
+
+        final AggregateInfoList aggInfos =
+                AggregateUtil.transformToBatchAggregateInfoList(
+                        aggInputRowType,
+                        
JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
+                        null,
+                        null);
+
+        final TableConfig tableConfig = planner.getTableConfig();
+        final int groupBufferLimitSize =
+                tableConfig
+                        .getConfiguration()
+                        
.getInteger(ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT);
+
+        final Tuple2<Long, Long> windowSizeAndSlideSize = 
WindowCodeGenerator.getWindowDef(window);
+        final SortWindowCodeGenerator windowCodeGenerator =
+                new SortWindowCodeGenerator(
+                        new CodeGeneratorContext(tableConfig),
+                        planner.getRelBuilder(),
+                        window,
+                        inputTimeFieldIndex,
+                        inputTimeIsDate,
+                        
JavaScalaConversionUtil.toScala(Arrays.asList(namedWindowProperties)),
+                        aggInfos,
+                        (RowType) inputNode.getOutputType(),
+                        (RowType) getOutputType(),
+                        groupBufferLimitSize,
+                        0L,
+                        windowSizeAndSlideSize.f0,
+                        windowSizeAndSlideSize.f1,
+                        grouping,
+                        auxGrouping,
+                        enableAssignPane,
+                        isMerge,
+                        isFinal);
+
+        final GeneratedOperator<OneInputStreamOperator<RowData, RowData>> 
generatedOperator;
+        if (grouping.length == 0) {
+            generatedOperator = windowCodeGenerator.genWithoutKeys();
+        } else {
+            generatedOperator = windowCodeGenerator.genWithKeys();
+        }
+
+        return new OneInputTransformation<>(
+                inputTransform,
+                getDesc(),
+                new CodeGenOperatorFactory<>(generatedOperator),
+                InternalTypeInfo.of(getOutputType()),
+                inputTransform.getParallelism());
+    }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortWindowCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortWindowCodeGenerator.scala
index bce1f56..fbe1b91 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortWindowCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortWindowCodeGenerator.scala
@@ -32,7 +32,6 @@ import 
org.apache.flink.table.runtime.operators.TableStreamOperator
 import org.apache.flink.table.runtime.operators.window.TimeWindow
 import org.apache.flink.table.types.logical.RowType
 
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.tools.RelBuilder
 
 /**
@@ -63,7 +62,6 @@ class SortWindowCodeGenerator(
     namedProperties: Seq[PlannerNamedWindowProperty],
     aggInfoList: AggregateInfoList,
     inputRowType: RowType,
-    inputType: RowType,
     outputType: RowType,
     buffLimitSize: Int,
     windowStart: Long,
@@ -125,7 +123,7 @@ class SortWindowCodeGenerator(
           slideSize,
           windowSize,
           inputTerm,
-          inputType,
+          inputRowType,
           outputType,
           windowsGrouping,
           windowElementType,
@@ -156,7 +154,7 @@ class SortWindowCodeGenerator(
     val className = if (isFinal) "SortWinAggWithoutKeys" else 
"LocalSortWinAggWithoutKeys"
     val baseClass = classOf[TableStreamOperator[_]].getName
     AggCodeGenHelper.generateOperator(
-      ctx, className, baseClass, processCode, endInputCode, inputType)
+      ctx, className, baseClass, processCode, endInputCode, inputRowType)
   }
 
   def genWithKeys(): GeneratedOperator[OneInputStreamOperator[RowData, 
RowData]] = {
@@ -169,7 +167,7 @@ class SortWindowCodeGenerator(
 
     val keyProjectionCode = 
ProjectionCodeGenerator.generateProjectionExpression(
       ctx,
-      inputType,
+      inputRowType,
       groupKeyRowType,
       grouping,
       inputTerm = inputTerm,
@@ -207,7 +205,7 @@ class SortWindowCodeGenerator(
           slideSize,
           windowSize,
           inputTerm,
-          inputType,
+          inputRowType,
           outputType,
           windowsGrouping,
           windowElementType,
@@ -254,7 +252,7 @@ class SortWindowCodeGenerator(
     val className = if (isFinal) "SortWinAggWithKeys" else 
"LocalSortWinAggWithKeys"
     val baseClass = classOf[TableStreamOperator[_]].getName
     AggCodeGenHelper.generateOperator(
-      ctx, className, baseClass, processCode, endInputCode, inputType)
+      ctx, className, baseClass, processCode, endInputCode, inputRowType)
   }
 
   private def choosePreAcc: Boolean = {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala
index 26eba34..ba595d7 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.plan.metadata
 
 import org.apache.flink.table.planner.JDouble
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortWindowAggregate,
 BatchPhysicalGroupAggregateBase, BatchPhysicalLocalHashWindowAggregate, 
BatchPhysicalWindowAggregateBase}
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalGroupAggregateBase,
 BatchPhysicalLocalHashWindowAggregate, BatchPhysicalLocalSortWindowAggregate, 
BatchPhysicalWindowAggregateBase}
 import org.apache.flink.table.planner.plan.stats._
 import org.apache.flink.table.planner.plan.utils.AggregateUtil
 
@@ -65,7 +65,7 @@ class AggCallSelectivityEstimator(agg: RelNode, mq: 
FlinkRelMetadataQuery)
       case rel: BatchPhysicalLocalHashWindowAggregate =>
         val fullGrouping = rel.grouping ++ Array(rel.inputTimeFieldIndex) ++ 
rel.auxGrouping
         (fullGrouping, rel.getAggCallList)
-      case rel: BatchExecLocalSortWindowAggregate =>
+      case rel: BatchPhysicalLocalSortWindowAggregate =>
         val fullGrouping = rel.grouping ++ Array(rel.inputTimeFieldIndex) ++ 
rel.auxGrouping
         (fullGrouping, rel.getAggCallList)
       case rel: BatchPhysicalWindowAggregateBase =>
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
index b1eec16..f0a2b8e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
@@ -542,7 +542,7 @@ class FlinkRelMdColumnInterval private extends 
MetadataHandler[ColumnInterval] {
       case agg: StreamPhysicalGroupWindowAggregate => agg.grouping
       case agg: BatchPhysicalGroupAggregateBase => agg.grouping ++ 
agg.auxGrouping
       case agg: Aggregate => AggregateUtil.checkAndGetFullGroupSet(agg)
-      case agg: BatchExecLocalSortWindowAggregate =>
+      case agg: BatchPhysicalLocalSortWindowAggregate =>
         // grouping + assignTs + auxGrouping
         agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping
       case agg: BatchPhysicalLocalHashWindowAggregate =>
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala
index 199e5d4..eed5805 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala
@@ -219,7 +219,7 @@ class FlinkRelMdSize private extends 
MetadataHandler[BuiltInMetadata.Size] {
           agg.auxGrouping.zipWithIndex.map {
             case (k, v) => k -> (agg.grouping.length + 1 + v)
           }.toMap
-      case agg: BatchExecLocalSortWindowAggregate =>
+      case agg: BatchPhysicalLocalSortWindowAggregate =>
         // local win-agg output type: grouping + assignTs + auxGrouping + 
aggCalls
         agg.grouping.zipWithIndex.toMap ++
           agg.auxGrouping.zipWithIndex.map {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
deleted file mode 100644
index 2395de8..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.physical.batch
-
-import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.data.RowData
-import org.apache.flink.table.functions.UserDefinedFunction
-import 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.codegen.CodeGeneratorContext
-import 
org.apache.flink.table.planner.codegen.agg.batch.{SortWindowCodeGenerator, 
WindowCodeGenerator}
-import org.apache.flink.table.planner.delegation.BatchPlanner
-import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory}
-import org.apache.flink.table.planner.plan.logical.LogicalWindow
-import org.apache.flink.table.planner.plan.nodes.exec.LegacyBatchExecNode
-import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
-import 
org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToBatchAggregateInfoList
-import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
-
-import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-
-abstract class BatchExecSortWindowAggregateBase(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    inputRel: RelNode,
-    outputRowType: RelDataType,
-    aggInputRowType: RelDataType,
-    grouping: Array[Int],
-    auxGrouping: Array[Int],
-    aggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)],
-    window: LogicalWindow,
-    inputTimeFieldIndex: Int,
-    inputTimeIsDate: Boolean,
-    namedWindowProperties: Seq[PlannerNamedWindowProperty],
-    enableAssignPane: Boolean = false,
-    isMerge: Boolean,
-    isFinal: Boolean)
-  extends BatchPhysicalWindowAggregateBase(
-    cluster,
-    traitSet,
-    inputRel,
-    outputRowType,
-    grouping,
-    auxGrouping,
-    aggCallToAggFunction,
-    window,
-    namedWindowProperties,
-    enableAssignPane,
-    isMerge,
-    isFinal)
-  with LegacyBatchExecNode[RowData] {
-
-  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
-    val inputRowCnt = mq.getRowCount(getInput)
-    if (inputRowCnt == null) {
-      return null
-    }
-    // does not take pane optimization into consideration here
-    // sort is not done here
-    val cpu = FlinkCost.FUNC_CPU_COST * inputRowCnt * aggCallToAggFunction.size
-    val averageRowSize: Double = mq.getAverageRowSize(this)
-    val memCost = averageRowSize
-    val costFactory = planner.getCostFactory.asInstanceOf[FlinkCostFactory]
-    costFactory.makeCost(mq.getRowCount(this), cpu, 0, 0, memCost)
-  }
-
-  //~ ExecNode methods 
-----------------------------------------------------------
-
-  override protected def translateToPlanInternal(
-      planner: BatchPlanner): Transformation[RowData] = {
-    val input = getInputNodes.get(0).translateToPlan(planner)
-        .asInstanceOf[Transformation[RowData]]
-    val ctx = CodeGeneratorContext(planner.getTableConfig)
-    val outputType = FlinkTypeFactory.toLogicalRowType(getRowType)
-    val inputRowType = getInput().getRowType
-    val inputType = FlinkTypeFactory.toLogicalRowType(inputRowType)
-
-    val aggInfos = transformToBatchAggregateInfoList(
-      FlinkTypeFactory.toLogicalRowType(aggInputRowType), 
aggCallToAggFunction.map(_._1))
-
-    val groupBufferLimitSize = 
planner.getTableConfig.getConfiguration.getInteger(
-      ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT)
-
-    val windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window)
-
-    val generator = new SortWindowCodeGenerator(
-      ctx, planner.getRelBuilder, window, inputTimeFieldIndex,
-      inputTimeIsDate, namedWindowProperties,
-      aggInfos, inputType, inputType, outputType,
-      groupBufferLimitSize, 0L, windowSizeAndSlideSize.f0, 
windowSizeAndSlideSize.f1,
-      grouping, auxGrouping, enableAssignPane, isMerge, isFinal)
-    val generatedOperator = if (grouping.isEmpty) {
-      generator.genWithoutKeys()
-    } else {
-      generator.genWithKeys()
-    }
-    val operator = new CodeGenOperatorFactory[RowData](generatedOperator)
-    ExecNodeUtil.createOneInputTransformation(
-      input,
-      getRelDetailedDescription,
-      operator,
-      InternalTypeInfo.of(outputType),
-      input.getParallelism,
-      0)
-  }
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLocalSortWindowAggregate.scala
similarity index 70%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLocalSortWindowAggregate.scala
index 28a2ea3..e7bd594 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLocalSortWindowAggregate.scala
@@ -20,8 +20,10 @@ package 
org.apache.flink.table.planner.plan.nodes.physical.batch
 
 import org.apache.flink.table.functions.UserDefinedFunction
 import 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.logical.LogicalWindow
-import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge
+import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortWindowAggregate
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
@@ -30,9 +32,8 @@ import org.apache.calcite.rel.core.AggregateCall
 
 import java.util
 
-import scala.collection.JavaConversions._
-
-class BatchExecLocalSortWindowAggregate(
+/** Batch physical RelNode for local sort-based window aggregate. */
+class BatchPhysicalLocalSortWindowAggregate(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputRel: RelNode,
@@ -46,25 +47,22 @@ class BatchExecLocalSortWindowAggregate(
     inputTimeIsDate: Boolean,
     namedWindowProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = false)
-  extends BatchExecSortWindowAggregateBase(
+  extends BatchPhysicalSortWindowAggregateBase(
     cluster,
     traitSet,
     inputRel,
     outputRowType,
-    inputRowType,
     grouping,
     auxGrouping,
     aggCallToAggFunction,
     window,
-    inputTimeFieldIndex,
-    inputTimeIsDate,
     namedWindowProperties,
     enableAssignPane,
     isMerge = false,
     isFinal = false) {
 
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
-    new BatchExecLocalSortWindowAggregate(
+    new BatchPhysicalLocalSortWindowAggregate(
       cluster,
       traitSet,
       inputs.get(0),
@@ -80,7 +78,22 @@ class BatchExecLocalSortWindowAggregate(
       enableAssignPane)
   }
 
-  //~ ExecNode methods 
-----------------------------------------------------------
-
-  override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT)
+  override def translateToExecNode(): ExecNode[_] = {
+    new BatchExecSortWindowAggregate(
+      grouping,
+      auxGrouping,
+      getAggCallList.toArray,
+      window,
+      inputTimeFieldIndex,
+      inputTimeIsDate,
+      namedWindowProperties.toArray,
+      FlinkTypeFactory.toLogicalRowType(inputRowType),
+      enableAssignPane,
+      false, // isMerge is always false
+      false, // isFinal is always false
+      ExecEdge.DEFAULT,
+      FlinkTypeFactory.toLogicalRowType(getRowType),
+      getRelDetailedDescription
+    )
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortWindowAggregate.scala
similarity index 72%
copy from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
copy to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortWindowAggregate.scala
index d0552b7..8195149 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortWindowAggregate.scala
@@ -20,19 +20,18 @@ package 
org.apache.flink.table.planner.plan.nodes.physical.batch
 
 import org.apache.flink.table.functions.UserDefinedFunction
 import 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.logical.LogicalWindow
-import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge
+import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortWindowAggregate
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
 
-import java.util
-
-import scala.collection.JavaConversions._
-
-class BatchExecSortWindowAggregate(
+/** Batch physical RelNode for (global) sort-based window aggregate. */
+class BatchPhysicalSortWindowAggregate(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputRel: RelNode,
@@ -47,25 +46,22 @@ class BatchExecSortWindowAggregate(
     namedWindowProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = false,
     isMerge: Boolean)
-  extends BatchExecSortWindowAggregateBase(
+  extends BatchPhysicalSortWindowAggregateBase(
     cluster,
     traitSet,
     inputRel,
     outputRowType,
-    aggInputRowType,
     grouping,
     auxGrouping,
     aggCallToAggFunction,
     window,
-    inputTimeFieldIndex,
-    inputTimeIsDate,
     namedWindowProperties,
     enableAssignPane,
     isMerge,
     isFinal = true) {
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
-    new BatchExecSortWindowAggregate(
+    new BatchPhysicalSortWindowAggregate(
       cluster,
       traitSet,
       inputs.get(0),
@@ -82,7 +78,22 @@ class BatchExecSortWindowAggregate(
       isMerge)
   }
 
-  //~ ExecNode methods 
-----------------------------------------------------------
-
-  override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT)
+  override def translateToExecNode(): ExecNode[_] = {
+    new BatchExecSortWindowAggregate(
+      grouping,
+      auxGrouping,
+      getAggCallList.toArray,
+      window,
+      inputTimeFieldIndex,
+      inputTimeIsDate,
+      namedWindowProperties.toArray,
+      FlinkTypeFactory.toLogicalRowType(aggInputRowType),
+      enableAssignPane,
+      isMerge,
+      true, // isFinal is always true
+      ExecEdge.DEFAULT,
+      FlinkTypeFactory.toLogicalRowType(getRowType),
+      getRelDetailedDescription
+    )
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortWindowAggregateBase.scala
similarity index 59%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortWindowAggregateBase.scala
index d0552b7..fde069b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortWindowAggregateBase.scala
@@ -20,69 +20,54 @@ package 
org.apache.flink.table.planner.plan.nodes.physical.batch
 
 import org.apache.flink.table.functions.UserDefinedFunction
 import 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
+import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory}
 import org.apache.flink.table.planner.plan.logical.LogicalWindow
-import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge
 
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.metadata.RelMetadataQuery
 
-import java.util
-
-import scala.collection.JavaConversions._
-
-class BatchExecSortWindowAggregate(
+/** Batch physical RelNode for sort-based window aggregate. */
+abstract class BatchPhysicalSortWindowAggregateBase(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputRel: RelNode,
     outputRowType: RelDataType,
-    aggInputRowType: RelDataType,
     grouping: Array[Int],
     auxGrouping: Array[Int],
     aggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)],
     window: LogicalWindow,
-    inputTimeFieldIndex: Int,
-    inputTimeIsDate: Boolean,
-    namedWindowProperties: Seq[PlannerNamedWindowProperty],
+    namedProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = false,
-    isMerge: Boolean)
-  extends BatchExecSortWindowAggregateBase(
+    isMerge: Boolean,
+    isFinal: Boolean)
+  extends BatchPhysicalWindowAggregateBase(
     cluster,
     traitSet,
     inputRel,
     outputRowType,
-    aggInputRowType,
     grouping,
     auxGrouping,
     aggCallToAggFunction,
     window,
-    inputTimeFieldIndex,
-    inputTimeIsDate,
-    namedWindowProperties,
+    namedProperties,
     enableAssignPane,
     isMerge,
-    isFinal = true) {
+    isFinal) {
 
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
-    new BatchExecSortWindowAggregate(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      getRowType,
-      aggInputRowType,
-      grouping,
-      auxGrouping,
-      aggCallToAggFunction,
-      window,
-      inputTimeFieldIndex,
-      inputTimeIsDate,
-      namedWindowProperties,
-      enableAssignPane,
-      isMerge)
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+    val inputRowCnt = mq.getRowCount(getInput)
+    if (inputRowCnt == null) {
+      return null
+    }
+    // does not take pane optimization into consideration here
+    // sort is not done here
+    val cpu = FlinkCost.FUNC_CPU_COST * inputRowCnt * aggCallToAggFunction.size
+    val averageRowSize: Double = mq.getAverageRowSize(this)
+    val memCost = averageRowSize
+    val costFactory = planner.getCostFactory.asInstanceOf[FlinkCostFactory]
+    costFactory.makeCost(mq.getRowCount(this), cpu, 0, 0, memCost)
   }
-
-  //~ ExecNode methods 
-----------------------------------------------------------
-
-  override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT)
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
index ff79e51..e881a06 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
@@ -27,7 +27,7 @@ import 
org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
 import org.apache.flink.table.planner.plan.logical.{LogicalWindow, 
SlidingGroupWindow, TumblingGroupWindow}
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWindowAggregate
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortWindowAggregate,
 BatchExecSortWindowAggregate, BatchPhysicalHashWindowAggregate, 
BatchPhysicalLocalHashWindowAggregate}
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalLocalSortWindowAggregate,
 BatchPhysicalSortWindowAggregate, BatchPhysicalHashWindowAggregate, 
BatchPhysicalLocalHashWindowAggregate}
 import org.apache.flink.table.planner.plan.utils.AggregateUtil
 import 
org.apache.flink.table.planner.plan.utils.AggregateUtil.hasTimeIntervalType
 import org.apache.flink.table.planner.plan.utils.PythonUtil.isPythonAggregate
@@ -202,7 +202,7 @@ class BatchPhysicalWindowAggregateRule
         val newLocalInput = RelOptRule.convert(input, localRequiredTraitSet)
         val localProvidedTraitSet = localRequiredTraitSet
 
-        new BatchExecLocalSortWindowAggregate(
+        new BatchPhysicalLocalSortWindowAggregate(
           agg.getCluster,
           localProvidedTraitSet,
           newLocalInput,
@@ -257,7 +257,7 @@ class BatchPhysicalWindowAggregateRule
           .replace(createRelCollation(groupSet.indices.toArray :+ 
groupSet.length))
         val newGlobalAggInput = RelOptRule.convert(localAgg, 
globalRequiredTraitSet)
 
-        new BatchExecSortWindowAggregate(
+        new BatchPhysicalSortWindowAggregate(
           agg.getCluster,
           aggProvidedTraitSet,
           newGlobalAggInput,
@@ -314,7 +314,7 @@ class BatchPhysicalWindowAggregateRule
           groupSet :+ inputTimeFieldIndex))
         val newInput = RelOptRule.convert(input, requiredTraitSet)
 
-        new BatchExecSortWindowAggregate(
+        new BatchPhysicalSortWindowAggregate(
           agg.getCluster,
           aggProvidedTraitSet,
           newInput,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala
index 81b436e..3c27a1d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.planner.JDouble
 import 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, Rank, 
WindowAggregate}
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortWindowAggregate,
 BatchPhysicalGroupAggregateBase, BatchPhysicalLocalHashWindowAggregate, 
BatchPhysicalWindowAggregateBase}
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalGroupAggregateBase,
 BatchPhysicalLocalHashWindowAggregate, BatchPhysicalLocalSortWindowAggregate, 
BatchPhysicalWindowAggregateBase}
 import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, 
RankRange}
 import org.apache.flink.table.runtime.operators.sort.BinaryIndexedSortable
 import 
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.LENGTH_SIZE_IN_BYTES
@@ -330,14 +330,12 @@ object FlinkRelMdUtil {
       groupKey: ImmutableBitSet,
       agg: SingleRel): (ImmutableBitSet, Array[AggregateCall]) = {
     val (aggCalls, fullGroupSet) = agg match {
-      case agg: BatchExecLocalSortWindowAggregate =>
+      case agg: BatchPhysicalLocalSortWindowAggregate =>
         // grouping + assignTs + auxGrouping
-        (agg.getAggCallList,
-          agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping)
+        (agg.getAggCallList, agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ 
agg.auxGrouping)
       case agg: BatchPhysicalLocalHashWindowAggregate =>
         // grouping + assignTs + auxGrouping
-        (agg.getAggCallList,
-          agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping)
+        (agg.getAggCallList, agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ 
agg.auxGrouping)
       case agg: BatchPhysicalWindowAggregateBase =>
         (agg.getAggCallList, agg.grouping ++ agg.auxGrouping)
       case agg: BatchPhysicalGroupAggregateBase =>

Reply via email to