This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 81849341df4375173346eb9f9095d7bc78bd0da3
Author: godfreyhe <[email protected]>
AuthorDate: Thu Jan 7 15:46:34 2021 +0800

    [FLINK-20857][table-planner-blink] Introduce 
BatchPhysicalPythonGroupWindowAggregate, and make 
BatchExecPythonGroupWindowAggregate only extended from ExecNode
    
    This closes #14574
---
 ...=> BatchPhysicalPythonWindowAggregateRule.java} |  16 +--
 .../BatchExecPythonGroupWindowAggregate.scala      | 112 +++++---------------
 .../BatchPhysicalPythonGroupWindowAggregate.scala  | 113 +++++++++++++++++++++
 .../planner/plan/rules/FlinkBatchRuleSets.scala    |   2 +-
 4 files changed, 148 insertions(+), 95 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonWindowAggregateRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java
similarity index 94%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonWindowAggregateRule.java
rename to 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java
index ac33ec8..ffc9c6a 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonWindowAggregateRule.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
 import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWindowAggregate;
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonGroupWindowAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalPythonGroupWindowAggregate;
 import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
 import org.apache.flink.table.planner.plan.utils.AggregateUtil;
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
@@ -56,17 +56,17 @@ import scala.collection.Seq;
 
 /**
  * The physical rule is responsible for convert {@link 
FlinkLogicalWindowAggregate} to {@link
- * BatchExecPythonGroupWindowAggregate}.
+ * BatchPhysicalPythonGroupWindowAggregate}.
  */
-public class BatchExecPythonWindowAggregateRule extends RelOptRule {
+public class BatchPhysicalPythonWindowAggregateRule extends RelOptRule {
 
-    public static final RelOptRule INSTANCE = new 
BatchExecPythonWindowAggregateRule();
+    public static final RelOptRule INSTANCE = new 
BatchPhysicalPythonWindowAggregateRule();
 
-    private BatchExecPythonWindowAggregateRule() {
+    private BatchPhysicalPythonWindowAggregateRule() {
         super(
                 operand(FlinkLogicalWindowAggregate.class, 
operand(RelNode.class, any())),
                 FlinkRelFactories.LOGICAL_BUILDER_WITHOUT_AGG_INPUT_PRUNE(),
-                "BatchExecPythonWindowAggregateRule");
+                "BatchPhysicalPythonWindowAggregateRule");
     }
 
     @Override
@@ -146,8 +146,8 @@ public class BatchExecPythonWindowAggregateRule extends 
RelOptRule {
         requiredTraitSet = requiredTraitSet.replace(sortCollation);
 
         RelNode newInput = RelOptRule.convert(input, requiredTraitSet);
-        BatchExecPythonGroupWindowAggregate windowAgg =
-                new BatchExecPythonGroupWindowAggregate(
+        BatchPhysicalPythonGroupWindowAggregate windowAgg =
+                new BatchPhysicalPythonGroupWindowAggregate(
                         agg.getCluster(),
                         traitSet,
                         newInput,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.scala
similarity index 62%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.scala
index a5e5d59..52b8ba8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.scala
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.physical.batch
+package org.apache.flink.table.planner.plan.nodes.exec.batch
 
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.configuration.Configuration
@@ -25,115 +25,55 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.data.RowData
-import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.functions.python.PythonFunctionInfo
 import 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.codegen.agg.batch.WindowCodeGenerator
-import org.apache.flink.table.planner.delegation.BatchPlanner
+import org.apache.flink.table.planner.delegation.PlannerBase
 import org.apache.flink.table.planner.expressions.{PlannerRowtimeAttribute, 
PlannerWindowEnd, PlannerWindowStart}
-import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory}
 import org.apache.flink.table.planner.plan.logical.LogicalWindow
+import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonGroupWindowAggregate.ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME
 import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonAggregate
-import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, 
LegacyBatchExecNode}
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonGroupWindowAggregate.ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode, 
ExecNodeBase}
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
 import org.apache.flink.table.types.logical.RowType
 
-import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.rel.metadata.RelMetadataQuery
 
-import java.util
-
-import scala.collection.JavaConversions._
+import java.util.Collections
 
 /**
-  * Batch physical RelNode for group widow aggregate (Python user defined 
aggregate function).
-  */
+ * Batch [[ExecNode]] for group widow aggregate (Python user defined aggregate 
function).
+ *
+ * <p>Note: This class can't be ported to Java,
+ * because java class can't extend scala interface with default implementation.
+ * FLINK-20858 will port this class to Java.
+ */
 class BatchExecPythonGroupWindowAggregate(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    inputRel: RelNode,
-    outputRowType: RelDataType,
-    inputRowType: RelDataType,
     grouping: Array[Int],
     auxGrouping: Array[Int],
-    aggCalls: Seq[AggregateCall],
-    aggFunctions: Array[UserDefinedFunction],
+    aggCalls: Array[AggregateCall],
     window: LogicalWindow,
     inputTimeFieldIndex: Int,
-    inputTimeIsDate: Boolean,
-    namedWindowProperties: Seq[PlannerNamedWindowProperty])
-  extends BatchPhysicalWindowAggregateBase(
-    cluster,
-    traitSet,
-    inputRel,
-    outputRowType,
-    grouping,
-    auxGrouping,
-    aggCalls.zip(aggFunctions),
-    window,
-    namedWindowProperties,
-    false,
-    false,
-    true)
-  with LegacyBatchExecNode[RowData]
+    namedWindowProperties: Array[PlannerNamedWindowProperty],
+    inputEdge: ExecEdge,
+    outputType: RowType,
+    description: String)
+  extends ExecNodeBase[RowData](Collections.singletonList(inputEdge), 
outputType, description)
+  with BatchExecNode[RowData]
   with CommonExecPythonAggregate {
 
-  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
-    new BatchExecPythonGroupWindowAggregate(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      outputRowType,
-      inputRowType,
-      grouping,
-      auxGrouping,
-      aggCalls,
-      aggFunctions,
-      window,
-      inputTimeFieldIndex,
-      inputTimeIsDate,
-      namedWindowProperties)
-  }
-
-  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
-    val inputRowCnt = mq.getRowCount(getInput)
-    if (inputRowCnt == null) {
-      return null
-    }
-    // does not take pane optimization into consideration here
-    // sort is not done here
-    val aggCallToAggFunction = aggCalls.zip(aggFunctions)
-    val cpu = FlinkCost.FUNC_CPU_COST * inputRowCnt * aggCallToAggFunction.size
-    val averageRowSize: Double = mq.getAverageRowSize(this)
-    val memCost = averageRowSize
-    val costFactory = planner.getCostFactory.asInstanceOf[FlinkCostFactory]
-    costFactory.makeCost(mq.getRowCount(this), cpu, 0, 0, memCost)
-  }
-
-  //~ ExecNode methods 
-----------------------------------------------------------
-
-  override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT)
-
-  override protected def translateToPlanInternal(
-      planner: BatchPlanner): Transformation[RowData] = {
-    val input = getInputNodes.get(0).translateToPlan(planner)
-      .asInstanceOf[Transformation[RowData]]
-    val outputType = FlinkTypeFactory.toLogicalRowType(getRowType)
-    val inputType = FlinkTypeFactory.toLogicalRowType(inputRowType)
+  override protected def translateToPlanInternal(planner: PlannerBase): 
Transformation[RowData] = {
+    val inputNode = getInputNodes.get(0).asInstanceOf[ExecNode[RowData]]
+    val inputTransform = inputNode.translateToPlan(planner)
 
     val windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window)
 
     val groupBufferLimitSize = 
planner.getTableConfig.getConfiguration.getInteger(
       ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT)
 
-    val ret = createPythonOneInputTransformation(
-      input,
-      inputType,
+    val transform = createPythonOneInputTransformation(
+      inputTransform,
+      inputNode.getOutputType.asInstanceOf[RowType],
       outputType,
       inputTimeFieldIndex,
       groupBufferLimitSize,
@@ -142,9 +82,9 @@ class BatchExecPythonGroupWindowAggregate(
       getConfig(planner.getExecEnv, planner.getTableConfig))
 
     if 
(isPythonWorkerUsingManagedMemory(planner.getTableConfig.getConfiguration)) {
-      ret.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON)
+      
transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON)
     }
-    ret
+    transform
   }
 
   private[this] def createPythonOneInputTransformation(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonGroupWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonGroupWindowAggregate.scala
new file mode 100644
index 0000000..a6e8c21
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonGroupWindowAggregate.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.batch
+
+import org.apache.flink.table.functions.UserDefinedFunction
+import 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory}
+import org.apache.flink.table.planner.plan.logical.LogicalWindow
+import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonGroupWindowAggregate
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+
+import java.util
+
+/**
+ * Batch physical RelNode for group widow aggregate (Python user defined 
aggregate function).
+ */
+class BatchPhysicalPythonGroupWindowAggregate(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    outputRowType: RelDataType,
+    inputRowType: RelDataType,
+    grouping: Array[Int],
+    auxGrouping: Array[Int],
+    aggCalls: Seq[AggregateCall],
+    aggFunctions: Array[UserDefinedFunction],
+    window: LogicalWindow,
+    inputTimeFieldIndex: Int,
+    inputTimeIsDate: Boolean,
+    namedWindowProperties: Seq[PlannerNamedWindowProperty])
+  extends BatchPhysicalWindowAggregateBase(
+    cluster,
+    traitSet,
+    inputRel,
+    outputRowType,
+    grouping,
+    auxGrouping,
+    aggCalls.zip(aggFunctions),
+    window,
+    namedWindowProperties,
+    false,
+    false,
+    true) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+    new BatchPhysicalPythonGroupWindowAggregate(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      outputRowType,
+      inputRowType,
+      grouping,
+      auxGrouping,
+      aggCalls,
+      aggFunctions,
+      window,
+      inputTimeFieldIndex,
+      inputTimeIsDate,
+      namedWindowProperties)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+    val inputRowCnt = mq.getRowCount(getInput)
+    if (inputRowCnt == null) {
+      return null
+    }
+    // does not take pane optimization into consideration here
+    // sort is not done here
+    val aggCallToAggFunction = aggCalls.zip(aggFunctions)
+    val cpu = FlinkCost.FUNC_CPU_COST * inputRowCnt * aggCallToAggFunction.size
+    val averageRowSize: Double = mq.getAverageRowSize(this)
+    val memCost = averageRowSize
+    val costFactory = planner.getCostFactory.asInstanceOf[FlinkCostFactory]
+    costFactory.makeCost(mq.getRowCount(this), cpu, 0, 0, memCost)
+  }
+
+  override def translateToExecNode(): ExecNode[_] = {
+    new BatchExecPythonGroupWindowAggregate(
+      grouping,
+      auxGrouping,
+      aggCalls.toArray,
+      window,
+      inputTimeFieldIndex,
+      namedWindowProperties.toArray,
+      ExecEdge.DEFAULT,
+      FlinkTypeFactory.toLogicalRowType(getRowType),
+      getRelDetailedDescription
+    )
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index bd25c16..e7aad9a 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -424,7 +424,7 @@ object FlinkBatchRuleSets {
     BatchExecOverAggregateRule.INSTANCE,
     // window agg
     BatchPhysicalWindowAggregateRule.INSTANCE,
-    BatchExecPythonWindowAggregateRule.INSTANCE,
+    BatchPhysicalPythonWindowAggregateRule.INSTANCE,
     // join
     BatchExecHashJoinRule.INSTANCE,
     BatchExecSortMergeJoinRule.INSTANCE,

Reply via email to