This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 36352a3a387e108c0087765d82e398b393111a7b Author: godfreyhe <[email protected]> AuthorDate: Thu Jan 7 15:30:54 2021 +0800 [FLINK-20857][table-planner-blink] Introduce BatchPhysicalSortWindowAggregate & BatchPhysicalLocalSortWindowAggregate, and make BatchExecSortWindowAggregate only extended from ExecNode This closes #14574 --- .../exec/batch/BatchExecSortWindowAggregate.java | 150 +++++++++++++++++++++ .../agg/batch/SortWindowCodeGenerator.scala | 12 +- .../metadata/AggCallSelectivityEstimator.scala | 4 +- .../plan/metadata/FlinkRelMdColumnInterval.scala | 2 +- .../planner/plan/metadata/FlinkRelMdSize.scala | 2 +- .../batch/BatchExecSortWindowAggregateBase.scala | 128 ------------------ ...=> BatchPhysicalLocalSortWindowAggregate.scala} | 37 +++-- ...cala => BatchPhysicalSortWindowAggregate.scala} | 39 ++++-- ... => BatchPhysicalSortWindowAggregateBase.scala} | 61 ++++----- .../batch/BatchPhysicalWindowAggregateRule.scala | 8 +- .../table/planner/plan/utils/FlinkRelMdUtil.scala | 10 +- 11 files changed, 240 insertions(+), 213 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortWindowAggregate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortWindowAggregate.java new file mode 100644 index 0000000..ccf3109 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortWindowAggregate.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.batch; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty; +import org.apache.flink.table.planner.codegen.CodeGeneratorContext; +import org.apache.flink.table.planner.codegen.agg.batch.SortWindowCodeGenerator; +import org.apache.flink.table.planner.codegen.agg.batch.WindowCodeGenerator; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.logical.LogicalWindow; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.utils.AggregateInfoList; +import org.apache.flink.table.planner.plan.utils.AggregateUtil; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.runtime.generated.GeneratedOperator; +import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.rel.core.AggregateCall; + +import java.util.Arrays; +import java.util.Collections; + +/** Batch {@link ExecNode} for sort-based window aggregate operator. */ +public class BatchExecSortWindowAggregate extends ExecNodeBase<RowData> + implements BatchExecNode<RowData> { + + private final int[] grouping; + private final int[] auxGrouping; + private final AggregateCall[] aggCalls; + private final LogicalWindow window; + private final int inputTimeFieldIndex; + private final boolean inputTimeIsDate; + private final PlannerNamedWindowProperty[] namedWindowProperties; + private final RowType aggInputRowType; + private final boolean enableAssignPane; + private final boolean isMerge; + private final boolean isFinal; + + public BatchExecSortWindowAggregate( + int[] grouping, + int[] auxGrouping, + AggregateCall[] aggCalls, + LogicalWindow window, + int inputTimeFieldIndex, + boolean inputTimeIsDate, + PlannerNamedWindowProperty[] namedWindowProperties, + RowType aggInputRowType, + boolean enableAssignPane, + boolean isMerge, + boolean isFinal, + ExecEdge inputEdge, + RowType outputType, + String description) { + super(Collections.singletonList(inputEdge), outputType, description); + this.grouping = grouping; + this.auxGrouping = auxGrouping; + this.aggCalls = aggCalls; + this.window = window; + this.inputTimeFieldIndex = inputTimeFieldIndex; + this.inputTimeIsDate = inputTimeIsDate; + this.namedWindowProperties = namedWindowProperties; + this.aggInputRowType = aggInputRowType; + this.enableAssignPane = enableAssignPane; + this.isMerge = isMerge; + this.isFinal = isFinal; + } + + @SuppressWarnings("unchecked") + @Override + protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) { + final ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0); + final Transformation<RowData> inputTransform = inputNode.translateToPlan(planner); + + final AggregateInfoList aggInfos = + AggregateUtil.transformToBatchAggregateInfoList( + aggInputRowType, + JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)), + null, + null); + + final TableConfig tableConfig = planner.getTableConfig(); + final int groupBufferLimitSize = + tableConfig + .getConfiguration() + .getInteger(ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT); + + final Tuple2<Long, Long> windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window); + final SortWindowCodeGenerator windowCodeGenerator = + new SortWindowCodeGenerator( + new CodeGeneratorContext(tableConfig), + planner.getRelBuilder(), + window, + inputTimeFieldIndex, + inputTimeIsDate, + JavaScalaConversionUtil.toScala(Arrays.asList(namedWindowProperties)), + aggInfos, + (RowType) inputNode.getOutputType(), + (RowType) getOutputType(), + groupBufferLimitSize, + 0L, + windowSizeAndSlideSize.f0, + windowSizeAndSlideSize.f1, + grouping, + auxGrouping, + enableAssignPane, + isMerge, + isFinal); + + final GeneratedOperator<OneInputStreamOperator<RowData, RowData>> generatedOperator; + if (grouping.length == 0) { + generatedOperator = windowCodeGenerator.genWithoutKeys(); + } else { + generatedOperator = windowCodeGenerator.genWithKeys(); + } + + return new OneInputTransformation<>( + inputTransform, + getDesc(), + new CodeGenOperatorFactory<>(generatedOperator), + InternalTypeInfo.of(getOutputType()), + inputTransform.getParallelism()); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortWindowCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortWindowCodeGenerator.scala index bce1f56..fbe1b91 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortWindowCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortWindowCodeGenerator.scala @@ -32,7 +32,6 @@ import org.apache.flink.table.runtime.operators.TableStreamOperator import org.apache.flink.table.runtime.operators.window.TimeWindow import org.apache.flink.table.types.logical.RowType -import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.tools.RelBuilder /** @@ -63,7 +62,6 @@ class SortWindowCodeGenerator( namedProperties: Seq[PlannerNamedWindowProperty], aggInfoList: AggregateInfoList, inputRowType: RowType, - inputType: RowType, outputType: RowType, buffLimitSize: Int, windowStart: Long, @@ -125,7 +123,7 @@ class SortWindowCodeGenerator( slideSize, windowSize, inputTerm, - inputType, + inputRowType, outputType, windowsGrouping, windowElementType, @@ -156,7 +154,7 @@ class SortWindowCodeGenerator( val className = if (isFinal) "SortWinAggWithoutKeys" else "LocalSortWinAggWithoutKeys" val baseClass = classOf[TableStreamOperator[_]].getName AggCodeGenHelper.generateOperator( - ctx, className, baseClass, processCode, endInputCode, inputType) + ctx, className, baseClass, processCode, endInputCode, inputRowType) } def genWithKeys(): GeneratedOperator[OneInputStreamOperator[RowData, RowData]] = { @@ -169,7 +167,7 @@ class SortWindowCodeGenerator( val keyProjectionCode = ProjectionCodeGenerator.generateProjectionExpression( ctx, - inputType, + inputRowType, groupKeyRowType, grouping, inputTerm = inputTerm, @@ -207,7 +205,7 @@ class SortWindowCodeGenerator( slideSize, windowSize, inputTerm, - inputType, + inputRowType, outputType, windowsGrouping, windowElementType, @@ -254,7 +252,7 @@ class SortWindowCodeGenerator( val className = if (isFinal) "SortWinAggWithKeys" else "LocalSortWinAggWithKeys" val baseClass = classOf[TableStreamOperator[_]].getName AggCodeGenHelper.generateOperator( - ctx, className, baseClass, processCode, endInputCode, inputType) + ctx, className, baseClass, processCode, endInputCode, inputRowType) } private def choosePreAcc: Boolean = { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala index 26eba34..ba595d7 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.metadata import org.apache.flink.table.planner.JDouble -import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortWindowAggregate, BatchPhysicalGroupAggregateBase, BatchPhysicalLocalHashWindowAggregate, BatchPhysicalWindowAggregateBase} +import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalGroupAggregateBase, BatchPhysicalLocalHashWindowAggregate, BatchPhysicalLocalSortWindowAggregate, BatchPhysicalWindowAggregateBase} import org.apache.flink.table.planner.plan.stats._ import org.apache.flink.table.planner.plan.utils.AggregateUtil @@ -65,7 +65,7 @@ class AggCallSelectivityEstimator(agg: RelNode, mq: FlinkRelMetadataQuery) case rel: BatchPhysicalLocalHashWindowAggregate => val fullGrouping = rel.grouping ++ Array(rel.inputTimeFieldIndex) ++ rel.auxGrouping (fullGrouping, rel.getAggCallList) - case rel: BatchExecLocalSortWindowAggregate => + case rel: BatchPhysicalLocalSortWindowAggregate => val fullGrouping = rel.grouping ++ Array(rel.inputTimeFieldIndex) ++ rel.auxGrouping (fullGrouping, rel.getAggCallList) case rel: BatchPhysicalWindowAggregateBase => diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala index b1eec16..f0a2b8e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala @@ -542,7 +542,7 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] { case agg: StreamPhysicalGroupWindowAggregate => agg.grouping case agg: BatchPhysicalGroupAggregateBase => agg.grouping ++ agg.auxGrouping case agg: Aggregate => AggregateUtil.checkAndGetFullGroupSet(agg) - case agg: BatchExecLocalSortWindowAggregate => + case agg: BatchPhysicalLocalSortWindowAggregate => // grouping + assignTs + auxGrouping agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping case agg: BatchPhysicalLocalHashWindowAggregate => diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala index 199e5d4..eed5805 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala @@ -219,7 +219,7 @@ class FlinkRelMdSize private extends MetadataHandler[BuiltInMetadata.Size] { agg.auxGrouping.zipWithIndex.map { case (k, v) => k -> (agg.grouping.length + 1 + v) }.toMap - case agg: BatchExecLocalSortWindowAggregate => + case agg: BatchPhysicalLocalSortWindowAggregate => // local win-agg output type: grouping + assignTs + auxGrouping + aggCalls agg.grouping.zipWithIndex.toMap ++ agg.auxGrouping.zipWithIndex.map { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala deleted file mode 100644 index 2395de8..0000000 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.physical.batch - -import org.apache.flink.api.dag.Transformation -import org.apache.flink.table.api.config.ExecutionConfigOptions -import org.apache.flink.table.data.RowData -import org.apache.flink.table.functions.UserDefinedFunction -import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty -import org.apache.flink.table.planner.calcite.FlinkTypeFactory -import org.apache.flink.table.planner.codegen.CodeGeneratorContext -import org.apache.flink.table.planner.codegen.agg.batch.{SortWindowCodeGenerator, WindowCodeGenerator} -import org.apache.flink.table.planner.delegation.BatchPlanner -import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory} -import org.apache.flink.table.planner.plan.logical.LogicalWindow -import org.apache.flink.table.planner.plan.nodes.exec.LegacyBatchExecNode -import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil -import org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToBatchAggregateInfoList -import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo - -import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.core.AggregateCall -import org.apache.calcite.rel.metadata.RelMetadataQuery - -abstract class BatchExecSortWindowAggregateBase( - cluster: RelOptCluster, - traitSet: RelTraitSet, - inputRel: RelNode, - outputRowType: RelDataType, - aggInputRowType: RelDataType, - grouping: Array[Int], - auxGrouping: Array[Int], - aggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)], - window: LogicalWindow, - inputTimeFieldIndex: Int, - inputTimeIsDate: Boolean, - namedWindowProperties: Seq[PlannerNamedWindowProperty], - enableAssignPane: Boolean = false, - isMerge: Boolean, - isFinal: Boolean) - extends BatchPhysicalWindowAggregateBase( - cluster, - traitSet, - inputRel, - outputRowType, - grouping, - auxGrouping, - aggCallToAggFunction, - window, - namedWindowProperties, - enableAssignPane, - isMerge, - isFinal) - with LegacyBatchExecNode[RowData] { - - override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { - val inputRowCnt = mq.getRowCount(getInput) - if (inputRowCnt == null) { - return null - } - // does not take pane optimization into consideration here - // sort is not done here - val cpu = FlinkCost.FUNC_CPU_COST * inputRowCnt * aggCallToAggFunction.size - val averageRowSize: Double = mq.getAverageRowSize(this) - val memCost = averageRowSize - val costFactory = planner.getCostFactory.asInstanceOf[FlinkCostFactory] - costFactory.makeCost(mq.getRowCount(this), cpu, 0, 0, memCost) - } - - //~ ExecNode methods ----------------------------------------------------------- - - override protected def translateToPlanInternal( - planner: BatchPlanner): Transformation[RowData] = { - val input = getInputNodes.get(0).translateToPlan(planner) - .asInstanceOf[Transformation[RowData]] - val ctx = CodeGeneratorContext(planner.getTableConfig) - val outputType = FlinkTypeFactory.toLogicalRowType(getRowType) - val inputRowType = getInput().getRowType - val inputType = FlinkTypeFactory.toLogicalRowType(inputRowType) - - val aggInfos = transformToBatchAggregateInfoList( - FlinkTypeFactory.toLogicalRowType(aggInputRowType), aggCallToAggFunction.map(_._1)) - - val groupBufferLimitSize = planner.getTableConfig.getConfiguration.getInteger( - ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT) - - val windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window) - - val generator = new SortWindowCodeGenerator( - ctx, planner.getRelBuilder, window, inputTimeFieldIndex, - inputTimeIsDate, namedWindowProperties, - aggInfos, inputType, inputType, outputType, - groupBufferLimitSize, 0L, windowSizeAndSlideSize.f0, windowSizeAndSlideSize.f1, - grouping, auxGrouping, enableAssignPane, isMerge, isFinal) - val generatedOperator = if (grouping.isEmpty) { - generator.genWithoutKeys() - } else { - generator.genWithKeys() - } - val operator = new CodeGenOperatorFactory[RowData](generatedOperator) - ExecNodeUtil.createOneInputTransformation( - input, - getRelDetailedDescription, - operator, - InternalTypeInfo.of(outputType), - input.getParallelism, - 0) - } -} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLocalSortWindowAggregate.scala similarity index 70% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLocalSortWindowAggregate.scala index 28a2ea3..e7bd594 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLocalSortWindowAggregate.scala @@ -20,8 +20,10 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch import org.apache.flink.table.functions.UserDefinedFunction import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty +import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.logical.LogicalWindow -import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortWindowAggregate +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode @@ -30,9 +32,8 @@ import org.apache.calcite.rel.core.AggregateCall import java.util -import scala.collection.JavaConversions._ - -class BatchExecLocalSortWindowAggregate( +/** Batch physical RelNode for local sort-based window aggregate. */ +class BatchPhysicalLocalSortWindowAggregate( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, @@ -46,25 +47,22 @@ class BatchExecLocalSortWindowAggregate( inputTimeIsDate: Boolean, namedWindowProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = false) - extends BatchExecSortWindowAggregateBase( + extends BatchPhysicalSortWindowAggregateBase( cluster, traitSet, inputRel, outputRowType, - inputRowType, grouping, auxGrouping, aggCallToAggFunction, window, - inputTimeFieldIndex, - inputTimeIsDate, namedWindowProperties, enableAssignPane, isMerge = false, isFinal = false) { override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - new BatchExecLocalSortWindowAggregate( + new BatchPhysicalLocalSortWindowAggregate( cluster, traitSet, inputs.get(0), @@ -80,7 +78,22 @@ class BatchExecLocalSortWindowAggregate( enableAssignPane) } - //~ ExecNode methods ----------------------------------------------------------- - - override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT) + override def translateToExecNode(): ExecNode[_] = { + new BatchExecSortWindowAggregate( + grouping, + auxGrouping, + getAggCallList.toArray, + window, + inputTimeFieldIndex, + inputTimeIsDate, + namedWindowProperties.toArray, + FlinkTypeFactory.toLogicalRowType(inputRowType), + enableAssignPane, + false, // isMerge is always false + false, // isFinal is always false + ExecEdge.DEFAULT, + FlinkTypeFactory.toLogicalRowType(getRowType), + getRelDetailedDescription + ) + } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortWindowAggregate.scala similarity index 72% copy from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala copy to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortWindowAggregate.scala index d0552b7..8195149 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortWindowAggregate.scala @@ -20,19 +20,18 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch import org.apache.flink.table.functions.UserDefinedFunction import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty +import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.logical.LogicalWindow -import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortWindowAggregate +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall -import java.util - -import scala.collection.JavaConversions._ - -class BatchExecSortWindowAggregate( +/** Batch physical RelNode for (global) sort-based window aggregate. */ +class BatchPhysicalSortWindowAggregate( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, @@ -47,25 +46,22 @@ class BatchExecSortWindowAggregate( namedWindowProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = false, isMerge: Boolean) - extends BatchExecSortWindowAggregateBase( + extends BatchPhysicalSortWindowAggregateBase( cluster, traitSet, inputRel, outputRowType, - aggInputRowType, grouping, auxGrouping, aggCallToAggFunction, window, - inputTimeFieldIndex, - inputTimeIsDate, namedWindowProperties, enableAssignPane, isMerge, isFinal = true) { override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new BatchExecSortWindowAggregate( + new BatchPhysicalSortWindowAggregate( cluster, traitSet, inputs.get(0), @@ -82,7 +78,22 @@ class BatchExecSortWindowAggregate( isMerge) } - //~ ExecNode methods ----------------------------------------------------------- - - override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT) + override def translateToExecNode(): ExecNode[_] = { + new BatchExecSortWindowAggregate( + grouping, + auxGrouping, + getAggCallList.toArray, + window, + inputTimeFieldIndex, + inputTimeIsDate, + namedWindowProperties.toArray, + FlinkTypeFactory.toLogicalRowType(aggInputRowType), + enableAssignPane, + isMerge, + true, // isFinal is always true + ExecEdge.DEFAULT, + FlinkTypeFactory.toLogicalRowType(getRowType), + getRelDetailedDescription + ) + } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortWindowAggregateBase.scala similarity index 59% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortWindowAggregateBase.scala index d0552b7..fde069b 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortWindowAggregateBase.scala @@ -20,69 +20,54 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch import org.apache.flink.table.functions.UserDefinedFunction import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty +import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory} import org.apache.flink.table.planner.plan.logical.LogicalWindow -import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.metadata.RelMetadataQuery -import java.util - -import scala.collection.JavaConversions._ - -class BatchExecSortWindowAggregate( +/** Batch physical RelNode for sort-based window aggregate. */ +abstract class BatchPhysicalSortWindowAggregateBase( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, outputRowType: RelDataType, - aggInputRowType: RelDataType, grouping: Array[Int], auxGrouping: Array[Int], aggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)], window: LogicalWindow, - inputTimeFieldIndex: Int, - inputTimeIsDate: Boolean, - namedWindowProperties: Seq[PlannerNamedWindowProperty], + namedProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = false, - isMerge: Boolean) - extends BatchExecSortWindowAggregateBase( + isMerge: Boolean, + isFinal: Boolean) + extends BatchPhysicalWindowAggregateBase( cluster, traitSet, inputRel, outputRowType, - aggInputRowType, grouping, auxGrouping, aggCallToAggFunction, window, - inputTimeFieldIndex, - inputTimeIsDate, - namedWindowProperties, + namedProperties, enableAssignPane, isMerge, - isFinal = true) { + isFinal) { - override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new BatchExecSortWindowAggregate( - cluster, - traitSet, - inputs.get(0), - getRowType, - aggInputRowType, - grouping, - auxGrouping, - aggCallToAggFunction, - window, - inputTimeFieldIndex, - inputTimeIsDate, - namedWindowProperties, - enableAssignPane, - isMerge) + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { + val inputRowCnt = mq.getRowCount(getInput) + if (inputRowCnt == null) { + return null + } + // does not take pane optimization into consideration here + // sort is not done here + val cpu = FlinkCost.FUNC_CPU_COST * inputRowCnt * aggCallToAggFunction.size + val averageRowSize: Double = mq.getAverageRowSize(this) + val memCost = averageRowSize + val costFactory = planner.getCostFactory.asInstanceOf[FlinkCostFactory] + costFactory.makeCost(mq.getRowCount(this), cpu, 0, 0, memCost) } - - //~ ExecNode methods ----------------------------------------------------------- - - override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala index ff79e51..e881a06 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala @@ -27,7 +27,7 @@ import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution import org.apache.flink.table.planner.plan.logical.{LogicalWindow, SlidingGroupWindow, TumblingGroupWindow} import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWindowAggregate -import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortWindowAggregate, BatchExecSortWindowAggregate, BatchPhysicalHashWindowAggregate, BatchPhysicalLocalHashWindowAggregate} +import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalLocalSortWindowAggregate, BatchPhysicalSortWindowAggregate, BatchPhysicalHashWindowAggregate, BatchPhysicalLocalHashWindowAggregate} import org.apache.flink.table.planner.plan.utils.AggregateUtil import org.apache.flink.table.planner.plan.utils.AggregateUtil.hasTimeIntervalType import org.apache.flink.table.planner.plan.utils.PythonUtil.isPythonAggregate @@ -202,7 +202,7 @@ class BatchPhysicalWindowAggregateRule val newLocalInput = RelOptRule.convert(input, localRequiredTraitSet) val localProvidedTraitSet = localRequiredTraitSet - new BatchExecLocalSortWindowAggregate( + new BatchPhysicalLocalSortWindowAggregate( agg.getCluster, localProvidedTraitSet, newLocalInput, @@ -257,7 +257,7 @@ class BatchPhysicalWindowAggregateRule .replace(createRelCollation(groupSet.indices.toArray :+ groupSet.length)) val newGlobalAggInput = RelOptRule.convert(localAgg, globalRequiredTraitSet) - new BatchExecSortWindowAggregate( + new BatchPhysicalSortWindowAggregate( agg.getCluster, aggProvidedTraitSet, newGlobalAggInput, @@ -314,7 +314,7 @@ class BatchPhysicalWindowAggregateRule groupSet :+ inputTimeFieldIndex)) val newInput = RelOptRule.convert(input, requiredTraitSet) - new BatchExecSortWindowAggregate( + new BatchPhysicalSortWindowAggregate( agg.getCluster, aggProvidedTraitSet, newInput, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala index 81b436e..3c27a1d 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala @@ -23,7 +23,7 @@ import org.apache.flink.table.planner.JDouble import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, Rank, WindowAggregate} -import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalSortWindowAggregate, BatchPhysicalGroupAggregateBase, BatchPhysicalLocalHashWindowAggregate, BatchPhysicalWindowAggregateBase} +import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalGroupAggregateBase, BatchPhysicalLocalHashWindowAggregate, BatchPhysicalLocalSortWindowAggregate, BatchPhysicalWindowAggregateBase} import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankRange} import org.apache.flink.table.runtime.operators.sort.BinaryIndexedSortable import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.LENGTH_SIZE_IN_BYTES @@ -330,14 +330,12 @@ object FlinkRelMdUtil { groupKey: ImmutableBitSet, agg: SingleRel): (ImmutableBitSet, Array[AggregateCall]) = { val (aggCalls, fullGroupSet) = agg match { - case agg: BatchExecLocalSortWindowAggregate => + case agg: BatchPhysicalLocalSortWindowAggregate => // grouping + assignTs + auxGrouping - (agg.getAggCallList, - agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping) + (agg.getAggCallList, agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping) case agg: BatchPhysicalLocalHashWindowAggregate => // grouping + assignTs + auxGrouping - (agg.getAggCallList, - agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping) + (agg.getAggCallList, agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping) case agg: BatchPhysicalWindowAggregateBase => (agg.getAggCallList, agg.grouping ++ agg.auxGrouping) case agg: BatchPhysicalGroupAggregateBase =>
