This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 81849341df4375173346eb9f9095d7bc78bd0da3 Author: godfreyhe <[email protected]> AuthorDate: Thu Jan 7 15:46:34 2021 +0800 [FLINK-20857][table-planner-blink] Introduce BatchPhysicalPythonGroupWindowAggregate, and make BatchExecPythonGroupWindowAggregate only extended from ExecNode This closes #14574 --- ...=> BatchPhysicalPythonWindowAggregateRule.java} | 16 +-- .../BatchExecPythonGroupWindowAggregate.scala | 112 +++++--------------- .../BatchPhysicalPythonGroupWindowAggregate.scala | 113 +++++++++++++++++++++ .../planner/plan/rules/FlinkBatchRuleSets.scala | 2 +- 4 files changed, 148 insertions(+), 95 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonWindowAggregateRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java similarity index 94% rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonWindowAggregateRule.java rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java index ac33ec8..ffc9c6a 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonWindowAggregateRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java @@ -29,7 +29,7 @@ import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow; import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow; import org.apache.flink.table.planner.plan.nodes.FlinkConventions; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWindowAggregate; -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonGroupWindowAggregate; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalPythonGroupWindowAggregate; import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution; import org.apache.flink.table.planner.plan.utils.AggregateUtil; import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; @@ -56,17 +56,17 @@ import scala.collection.Seq; /** * The physical rule is responsible for convert {@link FlinkLogicalWindowAggregate} to {@link - * BatchExecPythonGroupWindowAggregate}. + * BatchPhysicalPythonGroupWindowAggregate}. */ -public class BatchExecPythonWindowAggregateRule extends RelOptRule { +public class BatchPhysicalPythonWindowAggregateRule extends RelOptRule { - public static final RelOptRule INSTANCE = new BatchExecPythonWindowAggregateRule(); + public static final RelOptRule INSTANCE = new BatchPhysicalPythonWindowAggregateRule(); - private BatchExecPythonWindowAggregateRule() { + private BatchPhysicalPythonWindowAggregateRule() { super( operand(FlinkLogicalWindowAggregate.class, operand(RelNode.class, any())), FlinkRelFactories.LOGICAL_BUILDER_WITHOUT_AGG_INPUT_PRUNE(), - "BatchExecPythonWindowAggregateRule"); + "BatchPhysicalPythonWindowAggregateRule"); } @Override @@ -146,8 +146,8 @@ public class BatchExecPythonWindowAggregateRule extends RelOptRule { requiredTraitSet = requiredTraitSet.replace(sortCollation); RelNode newInput = RelOptRule.convert(input, requiredTraitSet); - BatchExecPythonGroupWindowAggregate windowAgg = - new BatchExecPythonGroupWindowAggregate( + BatchPhysicalPythonGroupWindowAggregate windowAgg = + new BatchPhysicalPythonGroupWindowAggregate( agg.getCluster(), traitSet, newInput, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.scala similarity index 62% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.scala index a5e5d59..52b8ba8 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.planner.plan.nodes.physical.batch +package org.apache.flink.table.planner.plan.nodes.exec.batch import org.apache.flink.api.dag.Transformation import org.apache.flink.configuration.Configuration @@ -25,115 +25,55 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.data.RowData -import org.apache.flink.table.functions.UserDefinedFunction import org.apache.flink.table.functions.python.PythonFunctionInfo import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty -import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.codegen.agg.batch.WindowCodeGenerator -import org.apache.flink.table.planner.delegation.BatchPlanner +import org.apache.flink.table.planner.delegation.PlannerBase import org.apache.flink.table.planner.expressions.{PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowStart} -import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory} import org.apache.flink.table.planner.plan.logical.LogicalWindow +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonGroupWindowAggregate.ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonAggregate -import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, LegacyBatchExecNode} -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonGroupWindowAggregate.ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode, ExecNodeBase} import org.apache.flink.table.runtime.typeutils.InternalTypeInfo import org.apache.flink.table.types.logical.RowType -import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall -import org.apache.calcite.rel.metadata.RelMetadataQuery -import java.util - -import scala.collection.JavaConversions._ +import java.util.Collections /** - * Batch physical RelNode for group widow aggregate (Python user defined aggregate function). - */ + * Batch [[ExecNode]] for group widow aggregate (Python user defined aggregate function). + * + * <p>Note: This class can't be ported to Java, + * because java class can't extend scala interface with default implementation. + * FLINK-20858 will port this class to Java. + */ class BatchExecPythonGroupWindowAggregate( - cluster: RelOptCluster, - traitSet: RelTraitSet, - inputRel: RelNode, - outputRowType: RelDataType, - inputRowType: RelDataType, grouping: Array[Int], auxGrouping: Array[Int], - aggCalls: Seq[AggregateCall], - aggFunctions: Array[UserDefinedFunction], + aggCalls: Array[AggregateCall], window: LogicalWindow, inputTimeFieldIndex: Int, - inputTimeIsDate: Boolean, - namedWindowProperties: Seq[PlannerNamedWindowProperty]) - extends BatchPhysicalWindowAggregateBase( - cluster, - traitSet, - inputRel, - outputRowType, - grouping, - auxGrouping, - aggCalls.zip(aggFunctions), - window, - namedWindowProperties, - false, - false, - true) - with LegacyBatchExecNode[RowData] + namedWindowProperties: Array[PlannerNamedWindowProperty], + inputEdge: ExecEdge, + outputType: RowType, + description: String) + extends ExecNodeBase[RowData](Collections.singletonList(inputEdge), outputType, description) + with BatchExecNode[RowData] with CommonExecPythonAggregate { - override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - new BatchExecPythonGroupWindowAggregate( - cluster, - traitSet, - inputs.get(0), - outputRowType, - inputRowType, - grouping, - auxGrouping, - aggCalls, - aggFunctions, - window, - inputTimeFieldIndex, - inputTimeIsDate, - namedWindowProperties) - } - - override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { - val inputRowCnt = mq.getRowCount(getInput) - if (inputRowCnt == null) { - return null - } - // does not take pane optimization into consideration here - // sort is not done here - val aggCallToAggFunction = aggCalls.zip(aggFunctions) - val cpu = FlinkCost.FUNC_CPU_COST * inputRowCnt * aggCallToAggFunction.size - val averageRowSize: Double = mq.getAverageRowSize(this) - val memCost = averageRowSize - val costFactory = planner.getCostFactory.asInstanceOf[FlinkCostFactory] - costFactory.makeCost(mq.getRowCount(this), cpu, 0, 0, memCost) - } - - //~ ExecNode methods ----------------------------------------------------------- - - override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT) - - override protected def translateToPlanInternal( - planner: BatchPlanner): Transformation[RowData] = { - val input = getInputNodes.get(0).translateToPlan(planner) - .asInstanceOf[Transformation[RowData]] - val outputType = FlinkTypeFactory.toLogicalRowType(getRowType) - val inputType = FlinkTypeFactory.toLogicalRowType(inputRowType) + override protected def translateToPlanInternal(planner: PlannerBase): Transformation[RowData] = { + val inputNode = getInputNodes.get(0).asInstanceOf[ExecNode[RowData]] + val inputTransform = inputNode.translateToPlan(planner) val windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window) val groupBufferLimitSize = planner.getTableConfig.getConfiguration.getInteger( ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT) - val ret = createPythonOneInputTransformation( - input, - inputType, + val transform = createPythonOneInputTransformation( + inputTransform, + inputNode.getOutputType.asInstanceOf[RowType], outputType, inputTimeFieldIndex, groupBufferLimitSize, @@ -142,9 +82,9 @@ class BatchExecPythonGroupWindowAggregate( getConfig(planner.getExecEnv, planner.getTableConfig)) if (isPythonWorkerUsingManagedMemory(planner.getTableConfig.getConfiguration)) { - ret.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON) + transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON) } - ret + transform } private[this] def createPythonOneInputTransformation( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonGroupWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonGroupWindowAggregate.scala new file mode 100644 index 0000000..a6e8c21 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonGroupWindowAggregate.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.physical.batch + +import org.apache.flink.table.functions.UserDefinedFunction +import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty +import org.apache.flink.table.planner.calcite.FlinkTypeFactory +import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory} +import org.apache.flink.table.planner.plan.logical.LogicalWindow +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonGroupWindowAggregate +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.metadata.RelMetadataQuery + +import java.util + +/** + * Batch physical RelNode for group widow aggregate (Python user defined aggregate function). + */ +class BatchPhysicalPythonGroupWindowAggregate( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputRel: RelNode, + outputRowType: RelDataType, + inputRowType: RelDataType, + grouping: Array[Int], + auxGrouping: Array[Int], + aggCalls: Seq[AggregateCall], + aggFunctions: Array[UserDefinedFunction], + window: LogicalWindow, + inputTimeFieldIndex: Int, + inputTimeIsDate: Boolean, + namedWindowProperties: Seq[PlannerNamedWindowProperty]) + extends BatchPhysicalWindowAggregateBase( + cluster, + traitSet, + inputRel, + outputRowType, + grouping, + auxGrouping, + aggCalls.zip(aggFunctions), + window, + namedWindowProperties, + false, + false, + true) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new BatchPhysicalPythonGroupWindowAggregate( + cluster, + traitSet, + inputs.get(0), + outputRowType, + inputRowType, + grouping, + auxGrouping, + aggCalls, + aggFunctions, + window, + inputTimeFieldIndex, + inputTimeIsDate, + namedWindowProperties) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { + val inputRowCnt = mq.getRowCount(getInput) + if (inputRowCnt == null) { + return null + } + // does not take pane optimization into consideration here + // sort is not done here + val aggCallToAggFunction = aggCalls.zip(aggFunctions) + val cpu = FlinkCost.FUNC_CPU_COST * inputRowCnt * aggCallToAggFunction.size + val averageRowSize: Double = mq.getAverageRowSize(this) + val memCost = averageRowSize + val costFactory = planner.getCostFactory.asInstanceOf[FlinkCostFactory] + costFactory.makeCost(mq.getRowCount(this), cpu, 0, 0, memCost) + } + + override def translateToExecNode(): ExecNode[_] = { + new BatchExecPythonGroupWindowAggregate( + grouping, + auxGrouping, + aggCalls.toArray, + window, + inputTimeFieldIndex, + namedWindowProperties.toArray, + ExecEdge.DEFAULT, + FlinkTypeFactory.toLogicalRowType(getRowType), + getRelDetailedDescription + ) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala index bd25c16..e7aad9a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala @@ -424,7 +424,7 @@ object FlinkBatchRuleSets { BatchExecOverAggregateRule.INSTANCE, // window agg BatchPhysicalWindowAggregateRule.INSTANCE, - BatchExecPythonWindowAggregateRule.INSTANCE, + BatchPhysicalPythonWindowAggregateRule.INSTANCE, // join BatchExecHashJoinRule.INSTANCE, BatchExecSortMergeJoinRule.INSTANCE,
