This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 45417078b3da3ee81113e393733b834e2b786056
Author: godfreyhe <[email protected]>
AuthorDate: Wed Dec 23 19:45:14 2020 +0800

    [FLINK-20737][table-planner-blink] Introduce 
StreamPhysicalPythonGroupTableAggregate, and make 
StreamExecPythonGroupTableAggregate only extended from ExecNode
    
    This closes #14478
---
 ...reamPhysicalPythonGroupTableAggregateRule.java} | 21 +++--
 .../StreamExecPythonGroupTableAggregate.scala      | 89 ++++++++++------------
 .../StreamPhysicalPythonGroupTableAggregate.scala  | 76 ++++++++++++++++++
 .../FlinkChangelogModeInferenceProgram.scala       |  2 +-
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |  2 +-
 5 files changed, 126 insertions(+), 64 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonGroupTableAggregateRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupTableAggregateRule.java
similarity index 87%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonGroupTableAggregateRule.java
rename to 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupTableAggregateRule.java
index d8465c6..81bd1465 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonGroupTableAggregateRule.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupTableAggregateRule.java
@@ -22,9 +22,10 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.functions.python.PythonFunctionKind;
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableAggregate;
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonGroupTableAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalPythonGroupTableAggregate;
 import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
 import org.apache.flink.table.planner.plan.utils.PythonUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
@@ -35,22 +36,20 @@ import org.apache.calcite.rel.core.AggregateCall;
 
 import java.util.List;
 
-import scala.collection.JavaConverters;
-
 /**
  * Rule to convert a {@link FlinkLogicalTableAggregate} into a {@link
- * StreamExecPythonGroupTableAggregate}.
+ * StreamPhysicalPythonGroupTableAggregateRule}.
  */
-public class StreamExecPythonGroupTableAggregateRule extends ConverterRule {
+public class StreamPhysicalPythonGroupTableAggregateRule extends ConverterRule 
{
 
-    public static final RelOptRule INSTANCE = new 
StreamExecPythonGroupTableAggregateRule();
+    public static final RelOptRule INSTANCE = new 
StreamPhysicalPythonGroupTableAggregateRule();
 
-    public StreamExecPythonGroupTableAggregateRule() {
+    public StreamPhysicalPythonGroupTableAggregateRule() {
         super(
                 FlinkLogicalTableAggregate.class,
                 FlinkConventions.LOGICAL(),
                 FlinkConventions.STREAM_PHYSICAL(),
-                "StreamExecPythonGroupTableAggregateRule");
+                "StreamPhysicalPythonGroupTableAggregateRule");
     }
 
     @Override
@@ -106,14 +105,12 @@ public class StreamExecPythonGroupTableAggregateRule 
extends ConverterRule {
                 rel.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL());
         RelNode newInput = RelOptRule.convert(agg.getInput(), 
requiredTraitSet);
 
-        return new StreamExecPythonGroupTableAggregate(
+        return new StreamPhysicalPythonGroupTableAggregate(
                 rel.getCluster(),
                 providedTraitSet,
                 newInput,
                 rel.getRowType(),
                 agg.getGroupSet().toArray(),
-                
JavaConverters.asScalaIteratorConverter(agg.getAggCallList().iterator())
-                        .asScala()
-                        .toSeq());
+                JavaScalaConversionUtil.toScala(agg.getAggCallList()));
     }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonGroupTableAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.scala
similarity index 72%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonGroupTableAggregate.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.scala
index 1251043..ea38246 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonGroupTableAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.scala
@@ -15,7 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream
 
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.configuration.Configuration
@@ -24,54 +25,41 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.delegation.StreamPlanner
-import org.apache.flink.table.planner.plan.nodes.exec.LegacyStreamExecNode
+import org.apache.flink.table.planner.delegation.PlannerBase
 import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonAggregate
-import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, 
KeySelectorUtil}
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode, 
ExecNodeBase}
+import org.apache.flink.table.planner.plan.utils.{AggregateUtil, 
KeySelectorUtil}
 import org.apache.flink.table.planner.typeutils.DataViewUtils.DataViewSpec
+import org.apache.flink.table.planner.utils.Logging
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
 import org.apache.flink.table.types.logical.RowType
 
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
 
-import java.util
+import java.util.Collections
 
 /**
-  * Stream physical RelNode for unbounded python group table aggregate.
-  */
+ * Stream [[ExecNode]] for unbounded python group table aggregate.
+ *
+ * <p>Note: This class can't be ported to Java,
+ * because java class can't extend scala interface with default implementation.
+ * FLINK-20750 will port this class to Java.
+ */
 class StreamExecPythonGroupTableAggregate(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    inputRel: RelNode,
-    outputRowType: RelDataType,
     grouping: Array[Int],
-    aggCalls: Seq[AggregateCall])
-  extends StreamPhysicalGroupTableAggregateBase(
-    cluster,
-    traitSet,
-    inputRel,
-    outputRowType,
-    grouping,
-    aggCalls)
-  with LegacyStreamExecNode[RowData]
-  with CommonExecPythonAggregate {
-
-  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
-    new StreamExecPythonGroupTableAggregate(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      outputRowType,
-      grouping,
-      aggCalls)
-  }
-
-  override protected def translateToPlanInternal(
-      planner: StreamPlanner): Transformation[RowData] = {
+    aggCalls: Seq[AggregateCall],
+    aggCallNeedRetractions: Array[Boolean],
+    generateUpdateBefore: Boolean,
+    needRetraction: Boolean,
+    inputEdge: ExecEdge,
+    outputType: RowType,
+    description: String)
+  extends ExecNodeBase[RowData](Collections.singletonList(inputEdge), 
outputType, description)
+  with StreamExecNode[RowData]
+  with CommonExecPythonAggregate
+  with Logging {
+
+  override protected def translateToPlanInternal(planner: PlannerBase): 
Transformation[RowData] = {
     val tableConfig = planner.getTableConfig
 
     if (grouping.length > 0 && tableConfig.getMinIdleStateRetentionTime < 0) {
@@ -80,14 +68,17 @@ class StreamExecPythonGroupTableAggregate(
         "state size. You may specify a retention time of 0 to not clean up the 
state.")
     }
 
-    val inputTransformation = getInputNodes.get(0).translateToPlan(planner)
-      .asInstanceOf[Transformation[RowData]]
-
-    val outRowType = FlinkTypeFactory.toLogicalRowType(outputRowType)
-    val inputRowType = FlinkTypeFactory.toLogicalRowType(getInput.getRowType)
+    val inputNode = getInputNodes.get(0).asInstanceOf[ExecNode[RowData]]
+    val inputTransformation = inputNode.translateToPlan(planner)
 
-    val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
+    val inputRowType = inputNode.getOutputType.asInstanceOf[RowType]
 
+    val aggInfoList = AggregateUtil.transformToStreamAggregateInfoList(
+      inputRowType,
+      aggCalls,
+      aggCallNeedRetractions,
+      needRetraction,
+      isStateBackendDataViews = true)
     val inputCountIndex = aggInfoList.getIndexOfCountStar
 
     var (pythonFunctionInfos, dataViewSpecs) =
@@ -100,7 +91,7 @@ class StreamExecPythonGroupTableAggregate(
     val operator = getPythonTableAggregateFunctionOperator(
       getConfig(planner.getExecEnv, tableConfig),
       inputRowType,
-      outRowType,
+      outputType,
       pythonFunctionInfos,
       dataViewSpecs,
       tableConfig.getMinIdleStateRetentionTime,
@@ -109,16 +100,14 @@ class StreamExecPythonGroupTableAggregate(
       generateUpdateBefore,
       inputCountIndex)
 
-    val selector = KeySelectorUtil.getRowDataSelector(
-      grouping,
-      InternalTypeInfo.of(inputRowType))
+    val selector = KeySelectorUtil.getRowDataSelector(grouping, 
InternalTypeInfo.of(inputRowType))
 
     // partitioned aggregation
     val ret = new OneInputTransformation(
       inputTransformation,
-      getRelDetailedDescription,
+      getDesc,
       operator,
-      InternalTypeInfo.of(outRowType),
+      InternalTypeInfo.of(outputType),
       inputTransformation.getParallelism)
 
     if (inputsContainSingleton()) {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonGroupTableAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonGroupTableAggregate.scala
new file mode 100644
index 0000000..9c373d1
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonGroupTableAggregate.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupTableAggregate
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
+import org.apache.flink.table.planner.plan.utils.{AggregateUtil, 
ChangelogPlanUtils}
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+
+import java.util
+
+/**
+ * Stream physical RelNode for unbounded python group table aggregate.
+ */
+class StreamPhysicalPythonGroupTableAggregate(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    outputRowType: RelDataType,
+    grouping: Array[Int],
+    aggCalls: Seq[AggregateCall])
+  extends StreamPhysicalGroupTableAggregateBase(
+    cluster,
+    traitSet,
+    inputRel,
+    outputRowType,
+    grouping,
+    aggCalls) {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+    new StreamPhysicalPythonGroupTableAggregate(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      outputRowType,
+      grouping,
+      aggCalls)
+  }
+
+  override def translateToExecNode(): ExecNode[_] = {
+    val aggCallNeedRetractions =
+      AggregateUtil.deriveAggCallNeedRetractions(this, grouping.length, 
aggCalls)
+    val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
+    val needRetraction = !ChangelogPlanUtils.inputInsertOnly(this)
+    new StreamExecPythonGroupTableAggregate(
+      grouping,
+      aggCalls,
+      aggCallNeedRetractions,
+      generateUpdateBefore,
+      needRetraction,
+      ExecEdge.DEFAULT,
+      FlinkTypeFactory.toLogicalRowType(getRowType),
+      getRelDetailedDescription
+    )
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index b7723fa..75f3a83 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -463,7 +463,7 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
 
       case _: StreamPhysicalGroupAggregate | _: 
StreamPhysicalGroupTableAggregate |
            _: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate |
-           _: StreamExecPythonGroupTableAggregate =>
+           _: StreamPhysicalPythonGroupTableAggregate =>
         // Aggregate, TableAggregate and Limit requires update_before if there 
are updates
         val requiredChildTrait = 
beforeAfterOrNone(getModifyKindSet(rel.getInput(0)))
         val children = visitChildren(rel, requiredChildTrait)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index d658e71..49ae726 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -423,7 +423,7 @@ object FlinkStreamRuleSets {
     StreamPhysicalGroupAggregateRule.INSTANCE,
     StreamPhysicalGroupTableAggregateRule.INSTANCE,
     StreamPhysicalPythonGroupAggregateRule.INSTANCE,
-    StreamExecPythonGroupTableAggregateRule.INSTANCE,
+    StreamPhysicalPythonGroupTableAggregateRule.INSTANCE,
     // over agg
     StreamExecOverAggregateRule.INSTANCE,
     StreamExecPythonOverAggregateRule.INSTANCE,

Reply via email to