This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 290658283d95be42454953ea484fe39b5ae0b3b4 Author: godfreyhe <[email protected]> AuthorDate: Wed Jan 6 21:12:01 2021 +0800 [FLINK-20857][table-planner-blink] Rename BatchExecWindowAggregateBase to BatchPhysicalWindowAggregateBase and do some refactoring This closes #14574 --- .../batch/BatchExecPythonWindowAggregateRule.java | 1 - .../metadata/AggCallSelectivityEstimator.scala | 10 +++---- .../plan/metadata/FlinkRelMdColumnInterval.scala | 10 +++---- .../plan/metadata/FlinkRelMdColumnUniqueness.scala | 6 ++-- .../plan/metadata/FlinkRelMdDistinctRowCount.scala | 8 ++--- .../plan/metadata/FlinkRelMdPopulationSize.scala | 6 ++-- .../planner/plan/metadata/FlinkRelMdRowCount.scala | 8 ++--- .../plan/metadata/FlinkRelMdSelectivity.scala | 8 ++--- .../planner/plan/metadata/FlinkRelMdSize.scala | 18 ++++++------ .../plan/metadata/FlinkRelMdUniqueGroups.scala | 7 ++--- .../plan/metadata/FlinkRelMdUniqueKeys.scala | 6 ++-- .../batch/BatchExecHashWindowAggregate.scala | 13 ++------- .../batch/BatchExecHashWindowAggregateBase.scala | 14 ++++----- .../batch/BatchExecLocalHashWindowAggregate.scala | 11 ++----- .../batch/BatchExecLocalSortWindowAggregate.scala | 11 ++----- .../BatchExecPythonGroupWindowAggregate.scala | 18 +++++------- .../batch/BatchExecSortWindowAggregate.scala | 13 ++------- .../batch/BatchExecSortWindowAggregateBase.scala | 15 ++++------ ...cala => BatchPhysicalWindowAggregateBase.scala} | 26 +++++++---------- .../batch/BatchExecWindowAggregateRule.scala | 10 ------- .../table/planner/plan/utils/FlinkRelMdUtil.scala | 34 +++++++++++----------- .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 15 ---------- 22 files changed, 102 insertions(+), 166 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonWindowAggregateRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonWindowAggregateRule.java index 958188d..ac33ec8 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonWindowAggregateRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonWindowAggregateRule.java @@ -149,7 +149,6 @@ public class BatchExecPythonWindowAggregateRule extends RelOptRule { BatchExecPythonGroupWindowAggregate windowAgg = new BatchExecPythonGroupWindowAggregate( agg.getCluster(), - call.builder(), traitSet, newInput, agg.getRowType(), diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala index d7b5418..acc90d5 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimator.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.metadata import org.apache.flink.table.planner.JDouble -import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalHashWindowAggregate, BatchExecLocalSortWindowAggregate, BatchExecWindowAggregateBase, BatchPhysicalGroupAggregateBase} +import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalHashWindowAggregate, BatchExecLocalSortWindowAggregate, BatchPhysicalGroupAggregateBase, BatchPhysicalWindowAggregateBase} import org.apache.flink.table.planner.plan.stats._ import org.apache.flink.table.planner.plan.utils.AggregateUtil @@ -63,13 +63,13 @@ class AggCallSelectivityEstimator(agg: RelNode, mq: FlinkRelMetadataQuery) case rel: BatchPhysicalGroupAggregateBase => (rel.grouping ++ rel.auxGrouping, rel.getAggCallList) case rel: BatchExecLocalHashWindowAggregate => - val fullGrouping = rel.getGrouping ++ Array(rel.inputTimeFieldIndex) ++ rel.getAuxGrouping + val fullGrouping = rel.grouping ++ Array(rel.inputTimeFieldIndex) ++ rel.auxGrouping (fullGrouping, rel.getAggCallList) case rel: BatchExecLocalSortWindowAggregate => - val fullGrouping = rel.getGrouping ++ Array(rel.inputTimeFieldIndex) ++ rel.getAuxGrouping + val fullGrouping = rel.grouping ++ Array(rel.inputTimeFieldIndex) ++ rel.auxGrouping (fullGrouping, rel.getAggCallList) - case rel: BatchExecWindowAggregateBase => - (rel.getGrouping ++ rel.getAuxGrouping, rel.getAggCallList) + case rel: BatchPhysicalWindowAggregateBase => + (rel.grouping ++ rel.auxGrouping, rel.getAggCallList) case _ => throw new IllegalArgumentException(s"Cannot handle ${agg.getRelTypeName}!") } require(outputIdx >= fullGrouping.length) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala index 34f0ade..4b80a55 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala @@ -498,7 +498,7 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] { * @return interval of the given column on batch window Aggregate */ def getColumnInterval( - agg: BatchExecWindowAggregateBase, + agg: BatchPhysicalWindowAggregateBase, mq: RelMetadataQuery, index: Int): ValueInterval = estimateColumnIntervalOfAggregate(agg, mq, index) @@ -544,11 +544,11 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] { case agg: Aggregate => AggregateUtil.checkAndGetFullGroupSet(agg) case agg: BatchExecLocalSortWindowAggregate => // grouping + assignTs + auxGrouping - agg.getGrouping ++ Array(agg.inputTimeFieldIndex) ++ agg.getAuxGrouping + agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping case agg: BatchExecLocalHashWindowAggregate => // grouping + assignTs + auxGrouping - agg.getGrouping ++ Array(agg.inputTimeFieldIndex) ++ agg.getAuxGrouping - case agg: BatchExecWindowAggregateBase => agg.getGrouping ++ agg.getAuxGrouping + agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping + case agg: BatchPhysicalWindowAggregateBase => agg.grouping ++ agg.auxGrouping case agg: TableAggregate => agg.getGroupSet.toArray case agg: StreamPhysicalGroupTableAggregate => agg.grouping case agg: StreamPhysicalGroupWindowTableAggregate => agg.grouping @@ -642,7 +642,7 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] { } else { null } - case agg: BatchExecWindowAggregateBase if agg.getAggCallList.length > aggCallIndex => + case agg: BatchPhysicalWindowAggregateBase if agg.getAggCallList.length > aggCallIndex => agg.getAggCallList(aggCallIndex) case _ => null } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala index 5d42e11..c9ec8d8 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala @@ -387,14 +387,14 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata } def areColumnsUnique( - rel: BatchExecWindowAggregateBase, + rel: BatchPhysicalWindowAggregateBase, mq: RelMetadataQuery, columns: ImmutableBitSet, ignoreNulls: Boolean): JBoolean = { if (rel.isFinal) { areColumnsUniqueOnWindowAggregate( - rel.getGrouping, - rel.getNamedProperties, + rel.grouping, + rel.namedWindowProperties, rel.getRowType.getFieldCount, mq, columns, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCount.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCount.scala index ee52362..b727c73 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCount.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCount.scala @@ -399,7 +399,7 @@ class FlinkRelMdDistinctRowCount private extends MetadataHandler[BuiltInMetadata FlinkRelMdUtil.splitPredicateOnAggregate(rel, predicate) case rel: BatchPhysicalGroupAggregateBase => FlinkRelMdUtil.splitPredicateOnAggregate(rel, predicate) - case rel: BatchExecWindowAggregateBase => + case rel: BatchPhysicalWindowAggregateBase => FlinkRelMdUtil.splitPredicateOnAggregate(rel, predicate) } @@ -427,7 +427,7 @@ class FlinkRelMdDistinctRowCount private extends MetadataHandler[BuiltInMetadata } def getDistinctRowCount( - rel: BatchExecWindowAggregateBase, + rel: BatchPhysicalWindowAggregateBase, mq: RelMetadataQuery, groupKey: ImmutableBitSet, predicate: RexNode): JDouble = { @@ -438,7 +438,7 @@ class FlinkRelMdDistinctRowCount private extends MetadataHandler[BuiltInMetadata } val newPredicate = if (rel.isFinal) { - val namedWindowStartIndex = rel.getRowType.getFieldCount - rel.getNamedProperties.size + val namedWindowStartIndex = rel.getRowType.getFieldCount - rel.namedWindowProperties.size val groupKeyFromNamedWindow = groupKey.toList.exists(_ >= namedWindowStartIndex) if (groupKeyFromNamedWindow) { // cannot estimate DistinctRowCount result when some group keys are from named windows @@ -455,7 +455,7 @@ class FlinkRelMdDistinctRowCount private extends MetadataHandler[BuiltInMetadata } } else { // local window aggregate - val assignTsFieldIndex = rel.getGrouping.length + val assignTsFieldIndex = rel.grouping.length if (groupKey.toList.contains(assignTsFieldIndex)) { // groupKey contains `assignTs` fields return null diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSize.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSize.scala index ac37076..1742c70 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSize.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSize.scala @@ -290,11 +290,11 @@ class FlinkRelMdPopulationSize private extends MetadataHandler[BuiltInMetadata.P } def getPopulationSize( - rel: BatchExecWindowAggregateBase, + rel: BatchPhysicalWindowAggregateBase, mq: RelMetadataQuery, groupKey: ImmutableBitSet): JDouble = { if (rel.isFinal) { - val namedWindowStartIndex = rel.getRowType.getFieldCount - rel.getNamedProperties.size + val namedWindowStartIndex = rel.getRowType.getFieldCount - rel.namedWindowProperties.size val groupKeyFromNamedWindow = groupKey.toList.exists(_ >= namedWindowStartIndex) if (groupKeyFromNamedWindow) { return null @@ -306,7 +306,7 @@ class FlinkRelMdPopulationSize private extends MetadataHandler[BuiltInMetadata.P } } else { // local window aggregate - val assignTsFieldIndex = rel.getGrouping.length + val assignTsFieldIndex = rel.grouping.length if (groupKey.toList.contains(assignTsFieldIndex)) { // groupKey contains `assignTs` fields return null diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala index 32328c5..2d48db8 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala @@ -147,8 +147,8 @@ class FlinkRelMdRowCount private extends MetadataHandler[BuiltInMetadata.RowCoun val (grouping, isFinal, isMerge) = rel match { case agg: BatchPhysicalGroupAggregateBase => (ImmutableBitSet.of(agg.grouping: _*), agg.isFinal, agg.isMerge) - case windowAgg: BatchExecWindowAggregateBase => - (ImmutableBitSet.of(windowAgg.getGrouping: _*), windowAgg.isFinal, windowAgg.isMerge) + case windowAgg: BatchPhysicalWindowAggregateBase => + (ImmutableBitSet.of(windowAgg.grouping: _*), windowAgg.isFinal, windowAgg.isMerge) case _ => throw new IllegalArgumentException(s"Unknown aggregate type ${rel.getRelTypeName}!") } val ndvOfGroupKeysOnGlobalAgg: JDouble = if (grouping.isEmpty) { @@ -199,10 +199,10 @@ class FlinkRelMdRowCount private extends MetadataHandler[BuiltInMetadata.RowCoun estimateRowCountOfWindowAgg(ndvOfGroupKeys, inputRowCount, rel.getWindow) } - def getRowCount(rel: BatchExecWindowAggregateBase, mq: RelMetadataQuery): JDouble = { + def getRowCount(rel: BatchPhysicalWindowAggregateBase, mq: RelMetadataQuery): JDouble = { val ndvOfGroupKeys = getRowCountOfBatchExecAgg(rel, mq) val inputRowCount = mq.getRowCount(rel.getInput) - estimateRowCountOfWindowAgg(ndvOfGroupKeys, inputRowCount, rel.getWindow) + estimateRowCountOfWindowAgg(ndvOfGroupKeys, inputRowCount, rel.window) } private def estimateRowCountOfWindowAgg( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivity.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivity.scala index bc30bd0..9143aca 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivity.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivity.scala @@ -110,7 +110,7 @@ class FlinkRelMdSelectivity private extends MetadataHandler[BuiltInMetadata.Sele } def getSelectivity( - rel: BatchExecWindowAggregateBase, + rel: BatchPhysicalWindowAggregateBase, mq: RelMetadataQuery, predicate: RexNode): JDouble = { val newPredicate = if (rel.isFinal) { @@ -131,12 +131,12 @@ class FlinkRelMdSelectivity private extends MetadataHandler[BuiltInMetadata.Sele val hasLocalAgg = agg match { case _: Aggregate => false case rel: BatchPhysicalGroupAggregateBase => rel.isFinal && rel.isMerge - case rel: BatchExecWindowAggregateBase => rel.isFinal && rel.isMerge + case rel: BatchPhysicalWindowAggregateBase => rel.isFinal && rel.isMerge case _ => throw new IllegalArgumentException(s"Cannot handle ${agg.getRelTypeName}!") } if (hasLocalAgg) { val childPredicate = agg match { - case rel: BatchExecWindowAggregateBase => + case rel: BatchPhysicalWindowAggregateBase => // set the predicate as they correspond to local window aggregate FlinkRelMdUtil.setChildPredicateOfWinAgg(predicate, rel) case _ => predicate @@ -149,7 +149,7 @@ class FlinkRelMdSelectivity private extends MetadataHandler[BuiltInMetadata.Sele FlinkRelMdUtil.splitPredicateOnAggregate(rel, predicate) case rel: BatchPhysicalGroupAggregateBase => FlinkRelMdUtil.splitPredicateOnAggregate(rel, predicate) - case rel: BatchExecWindowAggregateBase => + case rel: BatchPhysicalWindowAggregateBase => FlinkRelMdUtil.splitPredicateOnAggregate(rel, predicate) case _ => throw new IllegalArgumentException(s"Cannot handle ${agg.getRelTypeName}!") } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala index 024497f..e408203 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala @@ -202,7 +202,7 @@ class FlinkRelMdSize private extends MetadataHandler[BuiltInMetadata.Size] { } def averageColumnSizes( - rel: BatchExecWindowAggregateBase, + rel: BatchPhysicalWindowAggregateBase, mq: RelMetadataQuery): JList[JDouble] = { averageColumnSizesOfWindowAgg(rel, mq) } @@ -215,18 +215,18 @@ class FlinkRelMdSize private extends MetadataHandler[BuiltInMetadata.Size] { AggregateUtil.checkAndGetFullGroupSet(agg).zipWithIndex.toMap case agg: BatchExecLocalHashWindowAggregate => // local win-agg output type: grouping + assignTs + auxGrouping + aggCalls - agg.getGrouping.zipWithIndex.toMap ++ - agg.getAuxGrouping.zipWithIndex.map { - case (k, v) => k -> (agg.getGrouping.length + 1 + v) + agg.grouping.zipWithIndex.toMap ++ + agg.auxGrouping.zipWithIndex.map { + case (k, v) => k -> (agg.grouping.length + 1 + v) }.toMap case agg: BatchExecLocalSortWindowAggregate => // local win-agg output type: grouping + assignTs + auxGrouping + aggCalls - agg.getGrouping.zipWithIndex.toMap ++ - agg.getAuxGrouping.zipWithIndex.map { - case (k, v) => k -> (agg.getGrouping.length + 1 + v) + agg.grouping.zipWithIndex.toMap ++ + agg.auxGrouping.zipWithIndex.map { + case (k, v) => k -> (agg.grouping.length + 1 + v) }.toMap - case agg: BatchExecWindowAggregateBase => - (agg.getGrouping ++ agg.getAuxGrouping).zipWithIndex.toMap + case agg: BatchPhysicalWindowAggregateBase => + (agg.grouping ++ agg.auxGrouping).zipWithIndex.toMap case _ => throw new IllegalArgumentException(s"Unknown node type ${windowAgg.getRelTypeName}") } getColumnSizesFromInputOrType(windowAgg, mq, mapInputToOutput) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueGroups.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueGroups.scala index 8733d17..3a4934e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueGroups.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueGroups.scala @@ -264,12 +264,11 @@ class FlinkRelMdUniqueGroups private extends MetadataHandler[UniqueGroups] { } def getUniqueGroups( - agg: BatchExecWindowAggregateBase, + agg: BatchPhysicalWindowAggregateBase, mq: RelMetadataQuery, columns: ImmutableBitSet): ImmutableBitSet = { - val grouping = agg.getGrouping - val namedProperties = agg.getNamedProperties - getUniqueGroupsOfWindowAgg(agg, grouping, agg.getAuxGrouping, namedProperties, mq, columns) + getUniqueGroupsOfWindowAgg( + agg, agg.grouping, agg.auxGrouping, agg.namedWindowProperties, mq, columns) } private def getUniqueGroupsOfWindowAgg( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala index 9a9d544..312ce30 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala @@ -384,14 +384,14 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu } def getUniqueKeys( - rel: BatchExecWindowAggregateBase, + rel: BatchPhysicalWindowAggregateBase, mq: RelMetadataQuery, ignoreNulls: Boolean): util.Set[ImmutableBitSet] = { if (rel.isFinal) { getUniqueKeysOnWindowAgg( rel.getRowType.getFieldCount, - rel.getNamedProperties, - rel.getGrouping, + rel.namedWindowProperties, + rel.grouping, mq, ignoreNulls) } else { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala index b8b4ecb..f3ce9fe 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala @@ -27,7 +27,6 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall -import org.apache.calcite.tools.RelBuilder import java.util @@ -35,11 +34,9 @@ import scala.collection.JavaConversions._ class BatchExecHashWindowAggregate( cluster: RelOptCluster, - relBuilder: RelBuilder, traitSet: RelTraitSet, inputRel: RelNode, outputRowType: RelDataType, - inputRowType: RelDataType, aggInputRowType: RelDataType, grouping: Array[Int], auxGrouping: Array[Int], @@ -47,16 +44,14 @@ class BatchExecHashWindowAggregate( window: LogicalWindow, inputTimeFieldIndex: Int, inputTimeIsDate: Boolean, - namedProperties: Seq[PlannerNamedWindowProperty], + namedWindowProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = false, isMerge: Boolean) extends BatchExecHashWindowAggregateBase( cluster, - relBuilder, traitSet, inputRel, outputRowType, - inputRowType, aggInputRowType, grouping, auxGrouping, @@ -64,7 +59,7 @@ class BatchExecHashWindowAggregate( window, inputTimeFieldIndex, inputTimeIsDate, - namedProperties, + namedWindowProperties, enableAssignPane, isMerge, isFinal = true) { @@ -72,11 +67,9 @@ class BatchExecHashWindowAggregate( override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { new BatchExecHashWindowAggregate( cluster, - relBuilder, traitSet, inputs.get(0), getRowType, - inputRowType, aggInputRowType, grouping, auxGrouping, @@ -84,7 +77,7 @@ class BatchExecHashWindowAggregate( window, inputTimeFieldIndex, inputTimeIsDate, - namedProperties, + namedWindowProperties, enableAssignPane, isMerge) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala index 1fcd892..92bfeca 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala @@ -46,11 +46,9 @@ import org.apache.flink.table.runtime.util.collections.binary.BytesMap abstract class BatchExecHashWindowAggregateBase( cluster: RelOptCluster, - relBuilder: RelBuilder, traitSet: RelTraitSet, inputRel: RelNode, outputRowType: RelDataType, - inputRowType: RelDataType, aggInputRowType: RelDataType, grouping: Array[Int], auxGrouping: Array[Int], @@ -58,21 +56,20 @@ abstract class BatchExecHashWindowAggregateBase( window: LogicalWindow, inputTimeFieldIndex: Int, inputTimeIsDate: Boolean, - namedProperties: Seq[PlannerNamedWindowProperty], + namedWindowProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = false, isMerge: Boolean, isFinal: Boolean) - extends BatchExecWindowAggregateBase( + extends BatchPhysicalWindowAggregateBase( cluster, traitSet, inputRel, outputRowType, - inputRowType, grouping, auxGrouping, aggCallToAggFunction, window, - namedProperties, + namedWindowProperties, enableAssignPane, isMerge, isFinal) @@ -109,6 +106,7 @@ abstract class BatchExecHashWindowAggregateBase( .asInstanceOf[Transformation[RowData]] val ctx = CodeGeneratorContext(config) val outputType = FlinkTypeFactory.toLogicalRowType(getRowType) + val inputRowType = getInput.getRowType val inputType = FlinkTypeFactory.toLogicalRowType(inputRowType) val aggInfos = transformToBatchAggregateInfoList( @@ -120,8 +118,8 @@ abstract class BatchExecHashWindowAggregateBase( val (windowSize: Long, slideSize: Long) = WindowCodeGenerator.getWindowDef(window) val generatedOperator = new HashWindowCodeGenerator( - ctx, relBuilder, window, inputTimeFieldIndex, - inputTimeIsDate, namedProperties, + ctx, planner.getRelBuilder, window, inputTimeFieldIndex, + inputTimeIsDate, namedWindowProperties, aggInfos, inputRowType, grouping, auxGrouping, enableAssignPane, isMerge, isFinal).gen( inputType, outputType, groupBufferLimitSize, 0, windowSize, slideSize) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala index c15d323..2e19199 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala @@ -27,7 +27,6 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall -import org.apache.calcite.tools.RelBuilder import java.util @@ -35,7 +34,6 @@ import scala.collection.JavaConversions._ class BatchExecLocalHashWindowAggregate( cluster: RelOptCluster, - relBuilder: RelBuilder, traitSet: RelTraitSet, inputRel: RelNode, outputRowType: RelDataType, @@ -46,23 +44,21 @@ class BatchExecLocalHashWindowAggregate( window: LogicalWindow, val inputTimeFieldIndex: Int, inputTimeIsDate: Boolean, - namedProperties: Seq[PlannerNamedWindowProperty], + namedWindowProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = false) extends BatchExecHashWindowAggregateBase( cluster, - relBuilder, traitSet, inputRel, outputRowType, inputRowType, - inputRowType, grouping, auxGrouping, aggCallToAggFunction, window, inputTimeFieldIndex, inputTimeIsDate, - namedProperties, + namedWindowProperties, enableAssignPane, isMerge = false, isFinal = false) { @@ -70,7 +66,6 @@ class BatchExecLocalHashWindowAggregate( override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { new BatchExecLocalHashWindowAggregate( cluster, - relBuilder, traitSet, inputs.get(0), getRowType, @@ -81,7 +76,7 @@ class BatchExecLocalHashWindowAggregate( window, inputTimeFieldIndex, inputTimeIsDate, - namedProperties, + namedWindowProperties, enableAssignPane) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala index e9a3e89..28a2ea3 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala @@ -27,7 +27,6 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall -import org.apache.calcite.tools.RelBuilder import java.util @@ -35,7 +34,6 @@ import scala.collection.JavaConversions._ class BatchExecLocalSortWindowAggregate( cluster: RelOptCluster, - relBuilder: RelBuilder, traitSet: RelTraitSet, inputRel: RelNode, outputRowType: RelDataType, @@ -46,23 +44,21 @@ class BatchExecLocalSortWindowAggregate( window: LogicalWindow, val inputTimeFieldIndex: Int, inputTimeIsDate: Boolean, - namedProperties: Seq[PlannerNamedWindowProperty], + namedWindowProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = false) extends BatchExecSortWindowAggregateBase( cluster, - relBuilder, traitSet, inputRel, outputRowType, inputRowType, - inputRowType, grouping, auxGrouping, aggCallToAggFunction, window, inputTimeFieldIndex, inputTimeIsDate, - namedProperties, + namedWindowProperties, enableAssignPane, isMerge = false, isFinal = false) { @@ -70,7 +66,6 @@ class BatchExecLocalSortWindowAggregate( override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { new BatchExecLocalSortWindowAggregate( cluster, - relBuilder, traitSet, inputs.get(0), outputRowType, @@ -81,7 +76,7 @@ class BatchExecLocalSortWindowAggregate( window, inputTimeFieldIndex, inputTimeIsDate, - namedProperties, + namedWindowProperties, enableAssignPane) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala index ab3ce2c..ba84b24 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonGroupWindowAggregate.scala @@ -45,7 +45,6 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.calcite.tools.RelBuilder import java.util @@ -56,7 +55,6 @@ import scala.collection.JavaConversions._ */ class BatchExecPythonGroupWindowAggregate( cluster: RelOptCluster, - relBuilder: RelBuilder, traitSet: RelTraitSet, inputRel: RelNode, outputRowType: RelDataType, @@ -68,18 +66,17 @@ class BatchExecPythonGroupWindowAggregate( window: LogicalWindow, inputTimeFieldIndex: Int, inputTimeIsDate: Boolean, - namedProperties: Seq[PlannerNamedWindowProperty]) - extends BatchExecWindowAggregateBase( + namedWindowProperties: Seq[PlannerNamedWindowProperty]) + extends BatchPhysicalWindowAggregateBase( cluster, traitSet, inputRel, outputRowType, - inputRowType, grouping, auxGrouping, aggCalls.zip(aggFunctions), window, - namedProperties, + namedWindowProperties, false, false, true) @@ -89,7 +86,6 @@ class BatchExecPythonGroupWindowAggregate( override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { new BatchExecPythonGroupWindowAggregate( cluster, - relBuilder, traitSet, inputs.get(0), outputRowType, @@ -101,7 +97,7 @@ class BatchExecPythonGroupWindowAggregate( window, inputTimeFieldIndex, inputTimeIsDate, - namedProperties) + namedWindowProperties) } override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { @@ -160,7 +156,7 @@ class BatchExecPythonGroupWindowAggregate( windowSize: Long, slideSize: Long, config: Configuration): OneInputTransformation[RowData, RowData] = { - val namePropertyTypeArray = namedProperties.map { + val namePropertyTypeArray = namedWindowProperties.map { case PlannerNamedWindowProperty(_, p) => p match { case PlannerWindowStart(_) => 0 case PlannerWindowEnd(_) => 1 @@ -199,7 +195,7 @@ class BatchExecPythonGroupWindowAggregate( maxLimitSize: Int, windowSize: Long, slideSize: Long, - namedProperties: Array[Int], + namedWindowProperties: Array[Int], udafInputOffsets: Array[Int], pythonFunctionInfos: Array[PythonFunctionInfo]): OneInputStreamOperator[RowData, RowData] = { val clazz = loadClass(ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME) @@ -227,7 +223,7 @@ class BatchExecPythonGroupWindowAggregate( Integer.valueOf(maxLimitSize), java.lang.Long.valueOf(windowSize), java.lang.Long.valueOf(slideSize), - namedProperties, + namedWindowProperties, grouping, grouping ++ auxGrouping, udafInputOffsets) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala index 9eb0f97..d0552b7 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala @@ -27,7 +27,6 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall -import org.apache.calcite.tools.RelBuilder import java.util @@ -35,11 +34,9 @@ import scala.collection.JavaConversions._ class BatchExecSortWindowAggregate( cluster: RelOptCluster, - relBuilder: RelBuilder, traitSet: RelTraitSet, inputRel: RelNode, outputRowType: RelDataType, - inputRowType: RelDataType, aggInputRowType: RelDataType, grouping: Array[Int], auxGrouping: Array[Int], @@ -47,16 +44,14 @@ class BatchExecSortWindowAggregate( window: LogicalWindow, inputTimeFieldIndex: Int, inputTimeIsDate: Boolean, - namedProperties: Seq[PlannerNamedWindowProperty], + namedWindowProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = false, isMerge: Boolean) extends BatchExecSortWindowAggregateBase( cluster, - relBuilder, traitSet, inputRel, outputRowType, - inputRowType, aggInputRowType, grouping, auxGrouping, @@ -64,7 +59,7 @@ class BatchExecSortWindowAggregate( window, inputTimeFieldIndex, inputTimeIsDate, - namedProperties, + namedWindowProperties, enableAssignPane, isMerge, isFinal = true) { @@ -72,11 +67,9 @@ class BatchExecSortWindowAggregate( override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new BatchExecSortWindowAggregate( cluster, - relBuilder, traitSet, inputs.get(0), getRowType, - inputRowType, aggInputRowType, grouping, auxGrouping, @@ -84,7 +77,7 @@ class BatchExecSortWindowAggregate( window, inputTimeFieldIndex, inputTimeIsDate, - namedProperties, + namedWindowProperties, enableAssignPane, isMerge) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala index 64c1b0e..5ad4827 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala @@ -40,15 +40,12 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.calcite.tools.RelBuilder abstract class BatchExecSortWindowAggregateBase( cluster: RelOptCluster, - relBuilder: RelBuilder, traitSet: RelTraitSet, inputRel: RelNode, outputRowType: RelDataType, - inputRowType: RelDataType, aggInputRowType: RelDataType, grouping: Array[Int], auxGrouping: Array[Int], @@ -56,21 +53,20 @@ abstract class BatchExecSortWindowAggregateBase( window: LogicalWindow, inputTimeFieldIndex: Int, inputTimeIsDate: Boolean, - namedProperties: Seq[PlannerNamedWindowProperty], + namedWindowProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = false, isMerge: Boolean, isFinal: Boolean) - extends BatchExecWindowAggregateBase( + extends BatchPhysicalWindowAggregateBase( cluster, traitSet, inputRel, outputRowType, - inputRowType, grouping, auxGrouping, aggCallToAggFunction, window, - namedProperties, + namedWindowProperties, enableAssignPane, isMerge, isFinal) @@ -98,6 +94,7 @@ abstract class BatchExecSortWindowAggregateBase( .asInstanceOf[Transformation[RowData]] val ctx = CodeGeneratorContext(planner.getTableConfig) val outputType = FlinkTypeFactory.toLogicalRowType(getRowType) + val inputRowType = getInput().getRowType val inputType = FlinkTypeFactory.toLogicalRowType(inputRowType) val aggInfos = transformToBatchAggregateInfoList( @@ -109,8 +106,8 @@ abstract class BatchExecSortWindowAggregateBase( val (windowSize: Long, slideSize: Long) = WindowCodeGenerator.getWindowDef(window) val generator = new SortWindowCodeGenerator( - ctx, relBuilder, window, inputTimeFieldIndex, - inputTimeIsDate, namedProperties, + ctx, planner.getRelBuilder, window, inputTimeFieldIndex, + inputTimeIsDate, namedWindowProperties, aggInfos, inputRowType, inputType, outputType, groupBufferLimitSize, 0L, windowSize, slideSize, grouping, auxGrouping, enableAssignPane, isMerge, isFinal) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalWindowAggregateBase.scala similarity index 83% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecWindowAggregateBase.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalWindowAggregateBase.scala index 63ba6af..d87002b 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecWindowAggregateBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalWindowAggregateBase.scala @@ -29,17 +29,19 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -abstract class BatchExecWindowAggregateBase( +/** + * Base batch group window aggregate physical node. + */ +abstract class BatchPhysicalWindowAggregateBase( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, outputRowType: RelDataType, - inputRowType: RelDataType, - grouping: Array[Int], - auxGrouping: Array[Int], + val grouping: Array[Int], + val auxGrouping: Array[Int], aggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)], - window: LogicalWindow, - namedProperties: Seq[PlannerNamedWindowProperty], + val window: LogicalWindow, + val namedWindowProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = true, val isMerge: Boolean, val isFinal: Boolean) @@ -50,25 +52,19 @@ abstract class BatchExecWindowAggregateBase( throw new TableException("auxGrouping should be empty if grouping is empty.") } - def getGrouping: Array[Int] = grouping - - def getAuxGrouping: Array[Int] = auxGrouping - - def getWindow: LogicalWindow = window - - def getNamedProperties: Seq[PlannerNamedWindowProperty] = namedProperties - def getAggCallList: Seq[AggregateCall] = aggCallToAggFunction.map(_._1) override def deriveRowType(): RelDataType = outputRowType override def explainTerms(pw: RelWriter): RelWriter = { + val inputRowType = getInput.getRowType super.explainTerms(pw) .itemIf("groupBy", RelExplainUtil.fieldToString(grouping, inputRowType), grouping.nonEmpty) .itemIf("auxGrouping", RelExplainUtil.fieldToString(auxGrouping, inputRowType), auxGrouping.nonEmpty) .item("window", window) - .itemIf("properties", namedProperties.map(_.name).mkString(", "), namedProperties.nonEmpty) + .itemIf("properties", + namedWindowProperties.map(_.name).mkString(", "), namedWindowProperties.nonEmpty) .item("select", RelExplainUtil.windowAggregationToString( inputRowType, grouping, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala index 1c7b2bb..bbd96ac 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala @@ -182,7 +182,6 @@ class BatchExecWindowAggregateRule new BatchExecLocalHashWindowAggregate( agg.getCluster, - call.builder(), localProvidedTraitSet, newLocalInput, localAggRelType, @@ -205,7 +204,6 @@ class BatchExecWindowAggregateRule new BatchExecLocalSortWindowAggregate( agg.getCluster, - call.builder(), localProvidedTraitSet, newLocalInput, localAggRelType, @@ -238,11 +236,9 @@ class BatchExecWindowAggregateRule new BatchExecHashWindowAggregate( agg.getCluster, - call.builder(), aggProvidedTraitSet, newGlobalAggInput, agg.getRowType, - newGlobalAggInput.getRowType, input.getRowType, groupSet.indices.toArray, // auxGroupSet starts from `size of groupSet + 1(assignTs)` @@ -263,11 +259,9 @@ class BatchExecWindowAggregateRule new BatchExecSortWindowAggregate( agg.getCluster, - call.builder(), aggProvidedTraitSet, newGlobalAggInput, agg.getRowType, - newGlobalAggInput.getRowType, input.getRowType, groupSet.indices.toArray, // auxGroupSet starts from `size of groupSet + 1(assignTs)` @@ -301,12 +295,10 @@ class BatchExecWindowAggregateRule new BatchExecHashWindowAggregate( agg.getCluster, - call.builder(), aggProvidedTraitSet, newInput, agg.getRowType, newInput.getRowType, - newInput.getRowType, groupSet, auxGroupSet, aggCallToAggFunction, @@ -324,12 +316,10 @@ class BatchExecWindowAggregateRule new BatchExecSortWindowAggregate( agg.getCluster, - call.builder(), aggProvidedTraitSet, newInput, agg.getRowType, newInput.getRowType, - newInput.getRowType, groupSet, auxGroupSet, aggCallToAggFunction, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala index 321abcd..2afbf0c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala @@ -23,7 +23,7 @@ import org.apache.flink.table.planner.JDouble import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, Rank, WindowAggregate} -import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalHashWindowAggregate, BatchExecLocalSortWindowAggregate, BatchExecWindowAggregateBase, BatchPhysicalGroupAggregateBase} +import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecLocalHashWindowAggregate, BatchExecLocalSortWindowAggregate, BatchPhysicalGroupAggregateBase, BatchPhysicalWindowAggregateBase} import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankRange} import org.apache.flink.table.runtime.operators.sort.BinaryIndexedSortable import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.LENGTH_SIZE_IN_BYTES @@ -174,12 +174,12 @@ object FlinkRelMdUtil { * a predicate that stores NamedProperties predicate's selectivity */ def makeNamePropertiesSelectivityRexNode( - globalWinAgg: BatchExecWindowAggregateBase, + globalWinAgg: BatchPhysicalWindowAggregateBase, predicate: RexNode): RexNode = { require(globalWinAgg.isFinal, "local window agg does not contain NamedProperties!") - val fullGrouping = globalWinAgg.getGrouping ++ globalWinAgg.getAuxGrouping + val fullGrouping = globalWinAgg.grouping ++ globalWinAgg.auxGrouping makeNamePropertiesSelectivityRexNode( - globalWinAgg, fullGrouping, globalWinAgg.getNamedProperties, predicate) + globalWinAgg, fullGrouping, globalWinAgg.namedWindowProperties, predicate) } /** @@ -321,7 +321,7 @@ object FlinkRelMdUtil { */ def setAggChildKeys( groupKey: ImmutableBitSet, - aggRel: BatchExecWindowAggregateBase): (ImmutableBitSet, Array[AggregateCall]) = { + aggRel: BatchPhysicalWindowAggregateBase): (ImmutableBitSet, Array[AggregateCall]) = { require(!aggRel.isFinal || !aggRel.isMerge, "Cannot handle global agg which has local agg!") setChildKeysOfAgg(groupKey, aggRel) } @@ -333,13 +333,13 @@ object FlinkRelMdUtil { case agg: BatchExecLocalSortWindowAggregate => // grouping + assignTs + auxGrouping (agg.getAggCallList, - agg.getGrouping ++ Array(agg.inputTimeFieldIndex) ++ agg.getAuxGrouping) + agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping) case agg: BatchExecLocalHashWindowAggregate => // grouping + assignTs + auxGrouping (agg.getAggCallList, - agg.getGrouping ++ Array(agg.inputTimeFieldIndex) ++ agg.getAuxGrouping) - case agg: BatchExecWindowAggregateBase => - (agg.getAggCallList, agg.getGrouping ++ agg.getAuxGrouping) + agg.grouping ++ Array(agg.inputTimeFieldIndex) ++ agg.auxGrouping) + case agg: BatchPhysicalWindowAggregateBase => + (agg.getAggCallList, agg.grouping ++ agg.auxGrouping) case agg: BatchPhysicalGroupAggregateBase => (agg.getAggCallList, agg.grouping ++ agg.auxGrouping) case _ => throw new IllegalArgumentException(s"Unknown aggregate: ${agg.getRelTypeName}") @@ -373,11 +373,11 @@ object FlinkRelMdUtil { */ def setChildKeysOfWinAgg( groupKey: ImmutableBitSet, - globalWinAgg: BatchExecWindowAggregateBase): ImmutableBitSet = { + globalWinAgg: BatchPhysicalWindowAggregateBase): ImmutableBitSet = { require(globalWinAgg.isMerge, "Cannot handle global agg which does not have local window agg!") val childKeyBuilder = ImmutableBitSet.builder groupKey.toArray.foreach { key => - if (key < globalWinAgg.getGrouping.length) { + if (key < globalWinAgg.grouping.length) { childKeyBuilder.set(key) } else { // skips `assignTs` @@ -422,9 +422,9 @@ object FlinkRelMdUtil { val (childKeys, aggCalls) = setAggChildKeys(groupKey, rel) val childKeyExcludeAuxKey = removeAuxKey(childKeys, rel.grouping, rel.auxGrouping) (childKeyExcludeAuxKey, aggCalls) - case rel: BatchExecWindowAggregateBase => + case rel: BatchPhysicalWindowAggregateBase => val (childKeys, aggCalls) = setAggChildKeys(groupKey, rel) - val childKeyExcludeAuxKey = removeAuxKey(childKeys, rel.getGrouping, rel.getAuxGrouping) + val childKeyExcludeAuxKey = removeAuxKey(childKeys, rel.grouping, rel.auxGrouping) (childKeyExcludeAuxKey, aggCalls) case _ => throw new IllegalArgumentException(s"Unknown aggregate: ${agg.getRelTypeName}.") } @@ -471,9 +471,9 @@ object FlinkRelMdUtil { * Note, pushable condition will be converted based on the input field position. */ def splitPredicateOnAggregate( - agg: BatchExecWindowAggregateBase, + agg: BatchPhysicalWindowAggregateBase, predicate: RexNode): (Option[RexNode], Option[RexNode]) = { - splitPredicateOnAgg(agg.getGrouping ++ agg.getAuxGrouping, agg, predicate) + splitPredicateOnAgg(agg.grouping ++ agg.auxGrouping, agg, predicate) } /** @@ -488,13 +488,13 @@ object FlinkRelMdUtil { */ def setChildPredicateOfWinAgg( predicate: RexNode, - globalWinAgg: BatchExecWindowAggregateBase): RexNode = { + globalWinAgg: BatchPhysicalWindowAggregateBase): RexNode = { require(globalWinAgg.isMerge, "Cannot handle global agg which does not have local window agg!") if (predicate == null) { return null } // grouping + assignTs + auxGrouping - val fullGrouping = globalWinAgg.getGrouping ++ globalWinAgg.getAuxGrouping + val fullGrouping = globalWinAgg.grouping ++ globalWinAgg.auxGrouping // skips `assignTs` RexUtil.shift(predicate, fullGrouping.length, 1) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index cd3e5c0..5a5de0c 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -1246,7 +1246,6 @@ class FlinkRelMdHandlerTestBase { localWindowAggTypes, localWindowAggNames) val batchLocalWindowAgg = new BatchExecLocalHashWindowAggregate( batchCalc.getCluster, - relBuilder, batchPhysicalTraits, batchCalc, localWindowAggRowType, @@ -1263,11 +1262,9 @@ class FlinkRelMdHandlerTestBase { cluster, batchPhysicalTraits.replace(hash01), batchLocalWindowAgg, hash01) val batchWindowAggWithLocal = new BatchExecHashWindowAggregate( cluster, - relBuilder, batchPhysicalTraits, batchExchange2, flinkLogicalWindowAgg.getRowType, - batchExchange2.getRowType, batchCalc.getRowType, Array(0, 1), Array.empty, @@ -1282,12 +1279,10 @@ class FlinkRelMdHandlerTestBase { val batchWindowAggWithoutLocal = new BatchExecHashWindowAggregate( batchExchange1.getCluster, - relBuilder, batchPhysicalTraits, batchExchange1, flinkLogicalWindowAgg.getRowType, batchExchange1.getRowType, - batchExchange1.getRowType, Array(0, 1), Array.empty, aggCallToAggFunction, @@ -1390,7 +1385,6 @@ class FlinkRelMdHandlerTestBase { localWindowAggTypes, localWindowAggNames) val batchLocalWindowAgg = new BatchExecLocalHashWindowAggregate( batchCalc.getCluster, - relBuilder, batchPhysicalTraits, batchCalc, localWindowAggRowType, @@ -1407,11 +1401,9 @@ class FlinkRelMdHandlerTestBase { cluster, batchPhysicalTraits.replace(hash1), batchLocalWindowAgg, hash1) val batchWindowAggWithLocal = new BatchExecHashWindowAggregate( cluster, - relBuilder, batchPhysicalTraits, batchExchange2, flinkLogicalWindowAgg.getRowType, - batchExchange2.getRowType, batchCalc.getRowType, Array(0), Array.empty, @@ -1426,12 +1418,10 @@ class FlinkRelMdHandlerTestBase { val batchWindowAggWithoutLocal = new BatchExecHashWindowAggregate( batchExchange1.getCluster, - relBuilder, batchPhysicalTraits, batchExchange1, flinkLogicalWindowAgg.getRowType, batchExchange1.getRowType, - batchExchange1.getRowType, Array(1), Array.empty, aggCallToAggFunction, @@ -1539,7 +1529,6 @@ class FlinkRelMdHandlerTestBase { localWindowAggTypes, localWindowAggNames) val batchLocalWindowAggWithAuxGroup = new BatchExecLocalHashWindowAggregate( batchCalc.getCluster, - relBuilder, batchPhysicalTraits, batchCalc, localWindowAggRowType, @@ -1556,11 +1545,9 @@ class FlinkRelMdHandlerTestBase { cluster, batchPhysicalTraits.replace(hash0), batchLocalWindowAggWithAuxGroup, hash0) val batchWindowAggWithLocalWithAuxGroup = new BatchExecHashWindowAggregate( cluster, - relBuilder, batchPhysicalTraits, batchExchange2, flinkLogicalWindowAggWithAuxGroup.getRowType, - batchExchange2.getRowType, batchCalc.getRowType, Array(0), Array(2), // local output grouping keys: grouping + assignTs + auxGrouping @@ -1575,12 +1562,10 @@ class FlinkRelMdHandlerTestBase { val batchWindowAggWithoutLocalWithAuxGroup = new BatchExecHashWindowAggregate( batchExchange1.getCluster, - relBuilder, batchPhysicalTraits, batchExchange1, flinkLogicalWindowAggWithAuxGroup.getRowType, batchExchange1.getRowType, - batchExchange1.getRowType, Array(0), Array(1), aggCallToAggFunction,
