This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 45417078b3da3ee81113e393733b834e2b786056 Author: godfreyhe <[email protected]> AuthorDate: Wed Dec 23 19:45:14 2020 +0800 [FLINK-20737][table-planner-blink] Introduce StreamPhysicalPythonGroupTableAggregate, and make StreamExecPythonGroupTableAggregate only extended from ExecNode This closes #14478 --- ...reamPhysicalPythonGroupTableAggregateRule.java} | 21 +++-- .../StreamExecPythonGroupTableAggregate.scala | 89 ++++++++++------------ .../StreamPhysicalPythonGroupTableAggregate.scala | 76 ++++++++++++++++++ .../FlinkChangelogModeInferenceProgram.scala | 2 +- .../planner/plan/rules/FlinkStreamRuleSets.scala | 2 +- 5 files changed, 126 insertions(+), 64 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonGroupTableAggregateRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupTableAggregateRule.java similarity index 87% rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonGroupTableAggregateRule.java rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupTableAggregateRule.java index d8465c6..81bd1465 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonGroupTableAggregateRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupTableAggregateRule.java @@ -22,9 +22,10 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.functions.python.PythonFunctionKind; import org.apache.flink.table.planner.plan.nodes.FlinkConventions; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableAggregate; -import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonGroupTableAggregate; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalPythonGroupTableAggregate; import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution; import org.apache.flink.table.planner.plan.utils.PythonUtil; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; @@ -35,22 +36,20 @@ import org.apache.calcite.rel.core.AggregateCall; import java.util.List; -import scala.collection.JavaConverters; - /** * Rule to convert a {@link FlinkLogicalTableAggregate} into a {@link - * StreamExecPythonGroupTableAggregate}. + * StreamPhysicalPythonGroupTableAggregateRule}. */ -public class StreamExecPythonGroupTableAggregateRule extends ConverterRule { +public class StreamPhysicalPythonGroupTableAggregateRule extends ConverterRule { - public static final RelOptRule INSTANCE = new StreamExecPythonGroupTableAggregateRule(); + public static final RelOptRule INSTANCE = new StreamPhysicalPythonGroupTableAggregateRule(); - public StreamExecPythonGroupTableAggregateRule() { + public StreamPhysicalPythonGroupTableAggregateRule() { super( FlinkLogicalTableAggregate.class, FlinkConventions.LOGICAL(), FlinkConventions.STREAM_PHYSICAL(), - "StreamExecPythonGroupTableAggregateRule"); + "StreamPhysicalPythonGroupTableAggregateRule"); } @Override @@ -106,14 +105,12 @@ public class StreamExecPythonGroupTableAggregateRule extends ConverterRule { rel.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL()); RelNode newInput = RelOptRule.convert(agg.getInput(), requiredTraitSet); - return new StreamExecPythonGroupTableAggregate( + return new StreamPhysicalPythonGroupTableAggregate( rel.getCluster(), providedTraitSet, newInput, rel.getRowType(), agg.getGroupSet().toArray(), - JavaConverters.asScalaIteratorConverter(agg.getAggCallList().iterator()) - .asScala() - .toSeq()); + JavaScalaConversionUtil.toScala(agg.getAggCallList())); } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonGroupTableAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.scala similarity index 72% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonGroupTableAggregate.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.scala index 1251043..ea38246 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonGroupTableAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.scala @@ -15,7 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.table.planner.plan.nodes.physical.stream + +package org.apache.flink.table.planner.plan.nodes.exec.stream import org.apache.flink.api.dag.Transformation import org.apache.flink.configuration.Configuration @@ -24,54 +25,41 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.table.data.RowData import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo -import org.apache.flink.table.planner.calcite.FlinkTypeFactory -import org.apache.flink.table.planner.delegation.StreamPlanner -import org.apache.flink.table.planner.plan.nodes.exec.LegacyStreamExecNode +import org.apache.flink.table.planner.delegation.PlannerBase import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonAggregate -import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, KeySelectorUtil} +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode, ExecNodeBase} +import org.apache.flink.table.planner.plan.utils.{AggregateUtil, KeySelectorUtil} import org.apache.flink.table.planner.typeutils.DataViewUtils.DataViewSpec +import org.apache.flink.table.planner.utils.Logging import org.apache.flink.table.runtime.typeutils.InternalTypeInfo import org.apache.flink.table.types.logical.RowType -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall -import java.util +import java.util.Collections /** - * Stream physical RelNode for unbounded python group table aggregate. - */ + * Stream [[ExecNode]] for unbounded python group table aggregate. + * + * <p>Note: This class can't be ported to Java, + * because java class can't extend scala interface with default implementation. + * FLINK-20750 will port this class to Java. + */ class StreamExecPythonGroupTableAggregate( - cluster: RelOptCluster, - traitSet: RelTraitSet, - inputRel: RelNode, - outputRowType: RelDataType, grouping: Array[Int], - aggCalls: Seq[AggregateCall]) - extends StreamPhysicalGroupTableAggregateBase( - cluster, - traitSet, - inputRel, - outputRowType, - grouping, - aggCalls) - with LegacyStreamExecNode[RowData] - with CommonExecPythonAggregate { - - override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - new StreamExecPythonGroupTableAggregate( - cluster, - traitSet, - inputs.get(0), - outputRowType, - grouping, - aggCalls) - } - - override protected def translateToPlanInternal( - planner: StreamPlanner): Transformation[RowData] = { + aggCalls: Seq[AggregateCall], + aggCallNeedRetractions: Array[Boolean], + generateUpdateBefore: Boolean, + needRetraction: Boolean, + inputEdge: ExecEdge, + outputType: RowType, + description: String) + extends ExecNodeBase[RowData](Collections.singletonList(inputEdge), outputType, description) + with StreamExecNode[RowData] + with CommonExecPythonAggregate + with Logging { + + override protected def translateToPlanInternal(planner: PlannerBase): Transformation[RowData] = { val tableConfig = planner.getTableConfig if (grouping.length > 0 && tableConfig.getMinIdleStateRetentionTime < 0) { @@ -80,14 +68,17 @@ class StreamExecPythonGroupTableAggregate( "state size. You may specify a retention time of 0 to not clean up the state.") } - val inputTransformation = getInputNodes.get(0).translateToPlan(planner) - .asInstanceOf[Transformation[RowData]] - - val outRowType = FlinkTypeFactory.toLogicalRowType(outputRowType) - val inputRowType = FlinkTypeFactory.toLogicalRowType(getInput.getRowType) + val inputNode = getInputNodes.get(0).asInstanceOf[ExecNode[RowData]] + val inputTransformation = inputNode.translateToPlan(planner) - val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this) + val inputRowType = inputNode.getOutputType.asInstanceOf[RowType] + val aggInfoList = AggregateUtil.transformToStreamAggregateInfoList( + inputRowType, + aggCalls, + aggCallNeedRetractions, + needRetraction, + isStateBackendDataViews = true) val inputCountIndex = aggInfoList.getIndexOfCountStar var (pythonFunctionInfos, dataViewSpecs) = @@ -100,7 +91,7 @@ class StreamExecPythonGroupTableAggregate( val operator = getPythonTableAggregateFunctionOperator( getConfig(planner.getExecEnv, tableConfig), inputRowType, - outRowType, + outputType, pythonFunctionInfos, dataViewSpecs, tableConfig.getMinIdleStateRetentionTime, @@ -109,16 +100,14 @@ class StreamExecPythonGroupTableAggregate( generateUpdateBefore, inputCountIndex) - val selector = KeySelectorUtil.getRowDataSelector( - grouping, - InternalTypeInfo.of(inputRowType)) + val selector = KeySelectorUtil.getRowDataSelector(grouping, InternalTypeInfo.of(inputRowType)) // partitioned aggregation val ret = new OneInputTransformation( inputTransformation, - getRelDetailedDescription, + getDesc, operator, - InternalTypeInfo.of(outRowType), + InternalTypeInfo.of(outputType), inputTransformation.getParallelism) if (inputsContainSingleton()) { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonGroupTableAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonGroupTableAggregate.scala new file mode 100644 index 0000000..9c373d1 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonGroupTableAggregate.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.planner.plan.nodes.physical.stream + +import org.apache.flink.table.planner.calcite.FlinkTypeFactory +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupTableAggregate +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} +import org.apache.flink.table.planner.plan.utils.{AggregateUtil, ChangelogPlanUtils} + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall + +import java.util + +/** + * Stream physical RelNode for unbounded python group table aggregate. + */ +class StreamPhysicalPythonGroupTableAggregate( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputRel: RelNode, + outputRowType: RelDataType, + grouping: Array[Int], + aggCalls: Seq[AggregateCall]) + extends StreamPhysicalGroupTableAggregateBase( + cluster, + traitSet, + inputRel, + outputRowType, + grouping, + aggCalls) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new StreamPhysicalPythonGroupTableAggregate( + cluster, + traitSet, + inputs.get(0), + outputRowType, + grouping, + aggCalls) + } + + override def translateToExecNode(): ExecNode[_] = { + val aggCallNeedRetractions = + AggregateUtil.deriveAggCallNeedRetractions(this, grouping.length, aggCalls) + val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this) + val needRetraction = !ChangelogPlanUtils.inputInsertOnly(this) + new StreamExecPythonGroupTableAggregate( + grouping, + aggCalls, + aggCallNeedRetractions, + generateUpdateBefore, + needRetraction, + ExecEdge.DEFAULT, + FlinkTypeFactory.toLogicalRowType(getRowType), + getRelDetailedDescription + ) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index b7723fa..75f3a83 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -463,7 +463,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti case _: StreamPhysicalGroupAggregate | _: StreamPhysicalGroupTableAggregate | _: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate | - _: StreamExecPythonGroupTableAggregate => + _: StreamPhysicalPythonGroupTableAggregate => // Aggregate, TableAggregate and Limit requires update_before if there are updates val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(rel.getInput(0))) val children = visitChildren(rel, requiredChildTrait) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala index d658e71..49ae726 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala @@ -423,7 +423,7 @@ object FlinkStreamRuleSets { StreamPhysicalGroupAggregateRule.INSTANCE, StreamPhysicalGroupTableAggregateRule.INSTANCE, StreamPhysicalPythonGroupAggregateRule.INSTANCE, - StreamExecPythonGroupTableAggregateRule.INSTANCE, + StreamPhysicalPythonGroupTableAggregateRule.INSTANCE, // over agg StreamExecOverAggregateRule.INSTANCE, StreamExecPythonOverAggregateRule.INSTANCE,
