This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 63875b3c24ece384fd4e57d8e8ad5de9b27d3789
Author: godfreyhe <[email protected]>
AuthorDate: Tue Jan 5 20:00:24 2021 +0800

    [FLINK-20738][table-planner-blink] Introduce 
BatchPhysicalLocalSortAggregate, and make BatchPhysicalLocalSortAggregate only 
extended from FlinkPhysicalRel
    
    This closes #14562
---
 .../plan/metadata/FlinkRelMdColumnInterval.scala   |  2 +-
 ...scala => BatchPhysicalLocalSortAggregate.scala} | 76 ++++++----------------
 .../physical/batch/BatchPhysicalAggRuleBase.scala  |  4 +-
 .../physical/batch/BatchPhysicalSortAggRule.scala  |  4 +-
 .../batch/RemoveRedundantLocalSortAggRule.scala    | 16 ++---
 5 files changed, 33 insertions(+), 69 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
index 72f6891..5b809e8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
@@ -623,7 +623,7 @@ class FlinkRelMdColumnInterval private extends 
MetadataHandler[ColumnInterval] {
             } else {
               null
             }
-          case agg: BatchExecLocalSortAggregate =>
+          case agg: BatchPhysicalLocalSortAggregate =>
             getAggCallFromLocalAgg(aggCallIndex, agg.getAggCallList, 
agg.getInput.getRowType)
           case agg: BatchPhysicalSortAggregate if agg.isMerge =>
             val aggCallIndexInLocalAgg = getAggCallIndexInLocalAgg(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLocalSortAggregate.scala
similarity index 67%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLocalSortAggregate.scala
index ed4d362..4306431 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLocalSortAggregate.scala
@@ -18,20 +18,12 @@
 
 package org.apache.flink.table.planner.plan.nodes.physical.batch
 
-import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.data.RowData
 import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.codegen.CodeGeneratorContext
-import 
org.apache.flink.table.planner.codegen.agg.batch.{AggWithoutKeysCodeGenerator, 
SortAggCodeGenerator}
-import org.apache.flink.table.planner.delegation.BatchPlanner
 import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, 
FlinkRelDistributionTraitDef}
-import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
-import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, 
LegacyBatchExecNode}
-import 
org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToBatchAggregateInfoList
+import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortAggregate
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
 import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, 
RelExplainUtil}
-import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
 
 import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelDistribution.Type
@@ -49,7 +41,7 @@ import scala.collection.JavaConversions._
  *
  * @see [[BatchPhysicalGroupAggregateBase]] for more info.
  */
-class BatchExecLocalSortAggregate(
+class BatchPhysicalLocalSortAggregate(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputRel: RelNode,
@@ -67,11 +59,10 @@ class BatchExecLocalSortAggregate(
     auxGrouping,
     aggCallToAggFunction,
     isMerge = false,
-    isFinal = false)
-  with LegacyBatchExecNode[RowData] {
+    isFinal = false) {
 
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
-    new BatchExecLocalSortAggregate(
+    new BatchPhysicalLocalSortAggregate(
       cluster,
       traitSet,
       inputs.get(0),
@@ -136,52 +127,25 @@ class BatchExecLocalSortAggregate(
     Some(copy(newProvidedTraits, Seq(newInput)))
   }
 
-  //~ ExecNode methods 
-----------------------------------------------------------
-
-  override protected def translateToPlanInternal(
-      planner: BatchPlanner): Transformation[RowData] = {
-    val input = getInputNodes.get(0).translateToPlan(planner)
-      .asInstanceOf[Transformation[RowData]]
-    val ctx = CodeGeneratorContext(planner.getTableConfig)
-    val outputType = FlinkTypeFactory.toLogicalRowType(getRowType)
-    val inputType = FlinkTypeFactory.toLogicalRowType(inputRowType)
-
-    val aggInfos = transformToBatchAggregateInfoList(
-      FlinkTypeFactory.toLogicalRowType(inputRowType), getAggCallList)
-
-    val generatedOperator = if (grouping.isEmpty) {
-      AggWithoutKeysCodeGenerator.genWithoutKeys(
-        ctx, planner.getRelBuilder, aggInfos, inputType, outputType, isMerge, 
isFinal, "NoGrouping")
-    } else {
-      SortAggCodeGenerator.genWithKeys(
-        ctx,
-        planner.getRelBuilder,
-        aggInfos,
-        inputType,
-        outputType,
-        grouping,
-        auxGrouping,
-        isMerge,
-        isFinal)
-    }
-    val operator = new CodeGenOperatorFactory[RowData](generatedOperator)
-    ExecNodeUtil.createOneInputTransformation(
-      input,
-      getRelDetailedDescription,
-      operator,
-      InternalTypeInfo.of(outputType),
-      input.getParallelism,
-      0)
+  override def translateToExecNode(): ExecNode[_] = {
+    new BatchExecSortAggregate(
+      grouping,
+      auxGrouping,
+      getAggCallList.toArray,
+      FlinkTypeFactory.toLogicalRowType(inputRowType),
+      false, // isMerge is always false
+      false, // isFinal is always false
+      getInputEdge,
+      FlinkTypeFactory.toLogicalRowType(getRowType),
+      getRelDetailedDescription
+    )
   }
 
-  override def getInputEdges: util.List[ExecEdge] = {
+  private def getInputEdge: ExecEdge = {
     if (grouping.length == 0) {
-      List(
-        ExecEdge.builder()
-          .damBehavior(ExecEdge.DamBehavior.END_INPUT)
-          .build())
+      ExecEdge.builder().damBehavior(ExecEdge.DamBehavior.END_INPUT).build()
     } else {
-      List(ExecEdge.DEFAULT)
+      ExecEdge.DEFAULT
     }
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalAggRuleBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalAggRuleBase.scala
index bc0aabb..a04c9fb 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalAggRuleBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalAggRuleBase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.table.planner.JArrayList
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import 
org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
 import 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortAggregate,
 BatchPhysicalGroupAggregateBase, BatchPhysicalLocalHashAggregate}
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalGroupAggregateBase,
 BatchPhysicalLocalHashAggregate, BatchPhysicalLocalSortAggregate}
 import org.apache.flink.table.planner.plan.utils.{AggregateUtil, 
FlinkRelOptUtil}
 import org.apache.flink.table.planner.utils.AggregatePhaseStrategy
 import 
org.apache.flink.table.planner.utils.TableConfigUtils.getAggPhaseStrategy
@@ -219,7 +219,7 @@ trait BatchPhysicalAggRuleBase {
         auxGrouping,
         aggCallToAggFunction)
     } else {
-      new BatchExecLocalSortAggregate(
+      new BatchPhysicalLocalSortAggregate(
         cluster,
         traitSet,
         input,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortAggRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortAggRule.scala
index f3a931e..71175d8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortAggRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortAggRule.scala
@@ -39,7 +39,7 @@ import scala.collection.JavaConversions._
  *   BatchPhysicalSortAggregate (global)
  *   +- Sort (exists if group keys are not empty)
  *      +- BatchPhysicalExchange (hash by group keys if group keys is not 
empty, else singleton)
- *         +- BatchExecLocalSortAggregate (local)
+ *         +- BatchPhysicalLocalSortAggregate (local)
  *           +- Sort (exists if group keys are not empty)
  *              +- input of agg
  * }}}
@@ -88,7 +88,7 @@ class BatchPhysicalSortAggRule
 
     // create two-phase agg if possible
     if (isTwoPhaseAggWorkable(aggFunctions, tableConfig)) {
-      // create BatchExecLocalSortAggregate
+      // create BatchPhysicalLocalSortAggregate
       var localRequiredTraitSet = 
input.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
       if (agg.getGroupCount != 0) {
         val sortCollation = createRelCollation(groupSet)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala
index ae8d666..32920f9 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.plan.rules.physical.batch
 
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortAggregate,
 BatchPhysicalSort, BatchPhysicalSortAggregate}
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalLocalSortAggregate,
 BatchPhysicalSort, BatchPhysicalSortAggregate}
 
 import org.apache.calcite.plan.RelOptRule._
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
@@ -56,7 +56,7 @@ abstract class RemoveRedundantLocalSortAggRule(
 
   private[table] def getOriginalGlobalAgg(call: RelOptRuleCall): 
BatchPhysicalSortAggregate
 
-  private[table] def getOriginalLocalAgg(call: RelOptRuleCall): 
BatchExecLocalSortAggregate
+  private[table] def getOriginalLocalAgg(call: RelOptRuleCall): 
BatchPhysicalLocalSortAggregate
 
   private[table] def getOriginalInputOfLocalAgg(call: RelOptRuleCall): RelNode
 
@@ -64,7 +64,7 @@ abstract class RemoveRedundantLocalSortAggRule(
 
 class RemoveRedundantLocalSortAggWithoutSortRule extends 
RemoveRedundantLocalSortAggRule(
   operand(classOf[BatchPhysicalSortAggregate],
-    operand(classOf[BatchExecLocalSortAggregate],
+    operand(classOf[BatchPhysicalLocalSortAggregate],
       operand(classOf[RelNode], FlinkConventions.BATCH_PHYSICAL, any))),
   "RemoveRedundantLocalSortAggWithoutSortRule") {
 
@@ -74,8 +74,8 @@ class RemoveRedundantLocalSortAggWithoutSortRule extends 
RemoveRedundantLocalSor
   }
 
   override private[table] def getOriginalLocalAgg(
-      call: RelOptRuleCall): BatchExecLocalSortAggregate = {
-    call.rels(1).asInstanceOf[BatchExecLocalSortAggregate]
+      call: RelOptRuleCall): BatchPhysicalLocalSortAggregate = {
+    call.rels(1).asInstanceOf[BatchPhysicalLocalSortAggregate]
   }
 
   override private[table] def getOriginalInputOfLocalAgg(call: 
RelOptRuleCall): RelNode = {
@@ -87,7 +87,7 @@ class RemoveRedundantLocalSortAggWithoutSortRule extends 
RemoveRedundantLocalSor
 class RemoveRedundantLocalSortAggWithSortRule extends 
RemoveRedundantLocalSortAggRule(
   operand(classOf[BatchPhysicalSortAggregate],
     operand(classOf[BatchPhysicalSort],
-      operand(classOf[BatchExecLocalSortAggregate],
+      operand(classOf[BatchPhysicalLocalSortAggregate],
         operand(classOf[RelNode], FlinkConventions.BATCH_PHYSICAL, any)))),
   "RemoveRedundantLocalSortAggWithSortRule") {
 
@@ -97,8 +97,8 @@ class RemoveRedundantLocalSortAggWithSortRule extends 
RemoveRedundantLocalSortAg
   }
 
   override private[table] def getOriginalLocalAgg(
-      call: RelOptRuleCall): BatchExecLocalSortAggregate = {
-    call.rels(2).asInstanceOf[BatchExecLocalSortAggregate]
+      call: RelOptRuleCall): BatchPhysicalLocalSortAggregate = {
+    call.rels(2).asInstanceOf[BatchPhysicalLocalSortAggregate]
   }
 
   override private[table] def getOriginalInputOfLocalAgg(call: 
RelOptRuleCall): RelNode = {

Reply via email to