This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 63875b3c24ece384fd4e57d8e8ad5de9b27d3789 Author: godfreyhe <[email protected]> AuthorDate: Tue Jan 5 20:00:24 2021 +0800 [FLINK-20738][table-planner-blink] Introduce BatchPhysicalLocalSortAggregate, and make BatchPhysicalLocalSortAggregate only extended from FlinkPhysicalRel This closes #14562 --- .../plan/metadata/FlinkRelMdColumnInterval.scala | 2 +- ...scala => BatchPhysicalLocalSortAggregate.scala} | 76 ++++++---------------- .../physical/batch/BatchPhysicalAggRuleBase.scala | 4 +- .../physical/batch/BatchPhysicalSortAggRule.scala | 4 +- .../batch/RemoveRedundantLocalSortAggRule.scala | 16 ++--- 5 files changed, 33 insertions(+), 69 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala index 72f6891..5b809e8 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala @@ -623,7 +623,7 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] { } else { null } - case agg: BatchExecLocalSortAggregate => + case agg: BatchPhysicalLocalSortAggregate => getAggCallFromLocalAgg(aggCallIndex, agg.getAggCallList, agg.getInput.getRowType) case agg: BatchPhysicalSortAggregate if agg.isMerge => val aggCallIndexInLocalAgg = getAggCallIndexInLocalAgg( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLocalSortAggregate.scala similarity index 67% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLocalSortAggregate.scala index ed4d362..4306431 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLocalSortAggregate.scala @@ -18,20 +18,12 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch -import org.apache.flink.api.dag.Transformation -import org.apache.flink.table.data.RowData import org.apache.flink.table.functions.UserDefinedFunction import org.apache.flink.table.planner.calcite.FlinkTypeFactory -import org.apache.flink.table.planner.codegen.CodeGeneratorContext -import org.apache.flink.table.planner.codegen.agg.batch.{AggWithoutKeysCodeGenerator, SortAggCodeGenerator} -import org.apache.flink.table.planner.delegation.BatchPlanner import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef} -import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil -import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, LegacyBatchExecNode} -import org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToBatchAggregateInfoList +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortAggregate +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, RelExplainUtil} -import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelDistribution.Type @@ -49,7 +41,7 @@ import scala.collection.JavaConversions._ * * @see [[BatchPhysicalGroupAggregateBase]] for more info. */ -class BatchExecLocalSortAggregate( +class BatchPhysicalLocalSortAggregate( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, @@ -67,11 +59,10 @@ class BatchExecLocalSortAggregate( auxGrouping, aggCallToAggFunction, isMerge = false, - isFinal = false) - with LegacyBatchExecNode[RowData] { + isFinal = false) { override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - new BatchExecLocalSortAggregate( + new BatchPhysicalLocalSortAggregate( cluster, traitSet, inputs.get(0), @@ -136,52 +127,25 @@ class BatchExecLocalSortAggregate( Some(copy(newProvidedTraits, Seq(newInput))) } - //~ ExecNode methods ----------------------------------------------------------- - - override protected def translateToPlanInternal( - planner: BatchPlanner): Transformation[RowData] = { - val input = getInputNodes.get(0).translateToPlan(planner) - .asInstanceOf[Transformation[RowData]] - val ctx = CodeGeneratorContext(planner.getTableConfig) - val outputType = FlinkTypeFactory.toLogicalRowType(getRowType) - val inputType = FlinkTypeFactory.toLogicalRowType(inputRowType) - - val aggInfos = transformToBatchAggregateInfoList( - FlinkTypeFactory.toLogicalRowType(inputRowType), getAggCallList) - - val generatedOperator = if (grouping.isEmpty) { - AggWithoutKeysCodeGenerator.genWithoutKeys( - ctx, planner.getRelBuilder, aggInfos, inputType, outputType, isMerge, isFinal, "NoGrouping") - } else { - SortAggCodeGenerator.genWithKeys( - ctx, - planner.getRelBuilder, - aggInfos, - inputType, - outputType, - grouping, - auxGrouping, - isMerge, - isFinal) - } - val operator = new CodeGenOperatorFactory[RowData](generatedOperator) - ExecNodeUtil.createOneInputTransformation( - input, - getRelDetailedDescription, - operator, - InternalTypeInfo.of(outputType), - input.getParallelism, - 0) + override def translateToExecNode(): ExecNode[_] = { + new BatchExecSortAggregate( + grouping, + auxGrouping, + getAggCallList.toArray, + FlinkTypeFactory.toLogicalRowType(inputRowType), + false, // isMerge is always false + false, // isFinal is always false + getInputEdge, + FlinkTypeFactory.toLogicalRowType(getRowType), + getRelDetailedDescription + ) } - override def getInputEdges: util.List[ExecEdge] = { + private def getInputEdge: ExecEdge = { if (grouping.length == 0) { - List( - ExecEdge.builder() - .damBehavior(ExecEdge.DamBehavior.END_INPUT) - .build()) + ExecEdge.builder().damBehavior(ExecEdge.DamBehavior.END_INPUT).build() } else { - List(ExecEdge.DEFAULT) + ExecEdge.DEFAULT } } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalAggRuleBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalAggRuleBase.scala index bc0aabb..a04c9fb 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalAggRuleBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalAggRuleBase.scala @@ -24,7 +24,7 @@ import org.apache.flink.table.planner.JArrayList import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._ -import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortAggregate, BatchPhysicalGroupAggregateBase, BatchPhysicalLocalHashAggregate} +import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalGroupAggregateBase, BatchPhysicalLocalHashAggregate, BatchPhysicalLocalSortAggregate} import org.apache.flink.table.planner.plan.utils.{AggregateUtil, FlinkRelOptUtil} import org.apache.flink.table.planner.utils.AggregatePhaseStrategy import org.apache.flink.table.planner.utils.TableConfigUtils.getAggPhaseStrategy @@ -219,7 +219,7 @@ trait BatchPhysicalAggRuleBase { auxGrouping, aggCallToAggFunction) } else { - new BatchExecLocalSortAggregate( + new BatchPhysicalLocalSortAggregate( cluster, traitSet, input, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortAggRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortAggRule.scala index f3a931e..71175d8 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortAggRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortAggRule.scala @@ -39,7 +39,7 @@ import scala.collection.JavaConversions._ * BatchPhysicalSortAggregate (global) * +- Sort (exists if group keys are not empty) * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton) - * +- BatchExecLocalSortAggregate (local) + * +- BatchPhysicalLocalSortAggregate (local) * +- Sort (exists if group keys are not empty) * +- input of agg * }}} @@ -88,7 +88,7 @@ class BatchPhysicalSortAggRule // create two-phase agg if possible if (isTwoPhaseAggWorkable(aggFunctions, tableConfig)) { - // create BatchExecLocalSortAggregate + // create BatchPhysicalLocalSortAggregate var localRequiredTraitSet = input.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL) if (agg.getGroupCount != 0) { val sortCollation = createRelCollation(groupSet) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala index ae8d666..32920f9 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalSortAggRule.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.rules.physical.batch import org.apache.flink.table.planner.plan.nodes.FlinkConventions -import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortAggregate, BatchPhysicalSort, BatchPhysicalSortAggregate} +import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalLocalSortAggregate, BatchPhysicalSort, BatchPhysicalSortAggregate} import org.apache.calcite.plan.RelOptRule._ import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand} @@ -56,7 +56,7 @@ abstract class RemoveRedundantLocalSortAggRule( private[table] def getOriginalGlobalAgg(call: RelOptRuleCall): BatchPhysicalSortAggregate - private[table] def getOriginalLocalAgg(call: RelOptRuleCall): BatchExecLocalSortAggregate + private[table] def getOriginalLocalAgg(call: RelOptRuleCall): BatchPhysicalLocalSortAggregate private[table] def getOriginalInputOfLocalAgg(call: RelOptRuleCall): RelNode @@ -64,7 +64,7 @@ abstract class RemoveRedundantLocalSortAggRule( class RemoveRedundantLocalSortAggWithoutSortRule extends RemoveRedundantLocalSortAggRule( operand(classOf[BatchPhysicalSortAggregate], - operand(classOf[BatchExecLocalSortAggregate], + operand(classOf[BatchPhysicalLocalSortAggregate], operand(classOf[RelNode], FlinkConventions.BATCH_PHYSICAL, any))), "RemoveRedundantLocalSortAggWithoutSortRule") { @@ -74,8 +74,8 @@ class RemoveRedundantLocalSortAggWithoutSortRule extends RemoveRedundantLocalSor } override private[table] def getOriginalLocalAgg( - call: RelOptRuleCall): BatchExecLocalSortAggregate = { - call.rels(1).asInstanceOf[BatchExecLocalSortAggregate] + call: RelOptRuleCall): BatchPhysicalLocalSortAggregate = { + call.rels(1).asInstanceOf[BatchPhysicalLocalSortAggregate] } override private[table] def getOriginalInputOfLocalAgg(call: RelOptRuleCall): RelNode = { @@ -87,7 +87,7 @@ class RemoveRedundantLocalSortAggWithoutSortRule extends RemoveRedundantLocalSor class RemoveRedundantLocalSortAggWithSortRule extends RemoveRedundantLocalSortAggRule( operand(classOf[BatchPhysicalSortAggregate], operand(classOf[BatchPhysicalSort], - operand(classOf[BatchExecLocalSortAggregate], + operand(classOf[BatchPhysicalLocalSortAggregate], operand(classOf[RelNode], FlinkConventions.BATCH_PHYSICAL, any)))), "RemoveRedundantLocalSortAggWithSortRule") { @@ -97,8 +97,8 @@ class RemoveRedundantLocalSortAggWithSortRule extends RemoveRedundantLocalSortAg } override private[table] def getOriginalLocalAgg( - call: RelOptRuleCall): BatchExecLocalSortAggregate = { - call.rels(2).asInstanceOf[BatchExecLocalSortAggregate] + call: RelOptRuleCall): BatchPhysicalLocalSortAggregate = { + call.rels(2).asInstanceOf[BatchPhysicalLocalSortAggregate] } override private[table] def getOriginalInputOfLocalAgg(call: RelOptRuleCall): RelNode = {
