godfreyhe commented on a change in pull request #14894: URL: https://github.com/apache/flink/pull/14894#discussion_r628668576
########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/abilities/source/AggregatePushDownSpec.java ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.abilities.source; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.expressions.AggregateExpression; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A sub-class of {@link SourceAbilitySpec} that can not only serialize/deserialize the aggregation + * to/from JSON, but also can push the filter into a {@link SupportsAggregatePushDown}. Review comment: but also can push the local aggregate into a {@link SupportsAggregatePushDown} ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleBase.java ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.expressions.AggregateExpression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction; +import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.AggregateInfo; +import org.apache.flink.table.planner.plan.utils.AggregateInfoList; +import org.apache.flink.table.planner.plan.utils.AggregateUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; +import scala.collection.JavaConverters; + +/** + * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan} + * which table is a {@link TableSourceTable}. And the table source in the table is a {@link + * SupportsAggregatePushDown}. + * + * <p>The aggregate push down does not support a number of more complex statements at present: + * + * <ul> + * <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS. + * <li>expressions inside the aggregation function call: such as sum(a * b). + * <li>aggregations with ordering. + * <li>aggregations with filter. + * </ul> + */ +public abstract class PushLocalAggIntoTableSourceScanRuleBase extends RelOptRule { + + public PushLocalAggIntoTableSourceScanRuleBase(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + protected boolean isMatch( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase aggregate, + BatchPhysicalTableSourceScan tableSourceScan) { + TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig(); + if (!tableConfig + .getConfiguration() + .getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) { + return false; + } + + if (aggregate.isFinal() || aggregate.getAggCallList().size() < 1) { + return false; + } + List<AggregateCall> aggCallList = + JavaConverters.seqAsJavaListConverter(aggregate.getAggCallList()).asJava(); + for (AggregateCall aggCall : aggCallList) { + if (aggCall.isDistinct() + || aggCall.isApproximate() + || aggCall.getArgList().size() > 1 + || aggCall.hasFilter() + || !aggCall.getCollation().getFieldCollations().isEmpty()) { + return false; + } + } + TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable(); + // we can not push aggregates twice + return tableSourceTable != null + && tableSourceTable.tableSource() instanceof SupportsAggregatePushDown + && Arrays.stream(tableSourceTable.abilitySpecs()) + .noneMatch(spec -> spec instanceof AggregatePushDownSpec); + } + + protected void pushLocalAggregateIntoScan( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase localAgg, + BatchPhysicalTableSourceScan oldScan) { + RelDataType originalInputRowType = oldScan.deriveRowType(); + AggregateInfoList aggInfoList = + AggregateUtil.transformToBatchAggregateInfoList( + FlinkTypeFactory.toLogicalRowType(localAgg.getInput().getRowType()), + localAgg.getAggCallList(), + null, + null); + if (aggInfoList.aggInfos().length == 0) { + // no agg function need to be pushed down + return; + } + + List<int[]> groupingSets = Collections.singletonList(localAgg.grouping()); + List<AggregateExpression> aggExpressions = + buildAggregateExpressions(originalInputRowType, aggInfoList); + RelDataType relDataType = localAgg.deriveRowType(); + + TableSourceTable oldTableSourceTable = oldScan.tableSourceTable(); + DynamicTableSource newTableSource = oldScan.tableSource().copy(); + + boolean isPushDownSuccess = + AggregatePushDownSpec.apply( + groupingSets, + aggExpressions, + (RowType) FlinkTypeFactory.toLogicalType(relDataType), + newTableSource); + + if (!isPushDownSuccess) { + // aggregate push down failed, just return without changing any nodes. + return; + } + + FlinkStatistic newFlinkStatistic = getNewFlinkStatistic(oldTableSourceTable); + String[] newExtraDigests = + getNewExtraDigests(originalInputRowType, localAgg.grouping(), aggExpressions); + AggregatePushDownSpec aggregatePushDownSpec = + new AggregatePushDownSpec( + groupingSets, + aggExpressions, + (RowType) FlinkTypeFactory.toLogicalType(relDataType)); + TableSourceTable newTableSourceTable = + oldTableSourceTable + .copy( + newTableSource, + newFlinkStatistic, + newExtraDigests, + new SourceAbilitySpec[] {aggregatePushDownSpec}) + .copy(relDataType); + BatchPhysicalTableSourceScan newScan = + oldScan.copy(oldScan.getTraitSet(), newTableSourceTable); + BatchPhysicalExchange oldExchange = call.rel(0); + BatchPhysicalExchange newExchange = + oldExchange.copy(oldExchange.getTraitSet(), newScan, oldExchange.getDistribution()); + call.transformTo(newExchange); + } + + private List<AggregateExpression> buildAggregateExpressions( + RelDataType originalInputRowType, AggregateInfoList aggInfoList) { + List<AggregateExpression> aggExpressions = new ArrayList<>(); + for (AggregateInfo aggInfo : aggInfoList.aggInfos()) { + List<FieldReferenceExpression> arguments = new ArrayList<>(1); + for (int argIndex : aggInfo.argIndexes()) { + DataType argType = + TypeConversions.fromLogicalToDataType( + FlinkTypeFactory.toLogicalType( + originalInputRowType + .getFieldList() + .get(argIndex) + .getType())); + FieldReferenceExpression field = + new FieldReferenceExpression( + originalInputRowType.getFieldNames().get(argIndex), + argType, + argIndex, Review comment: the third parameter is `inputIndex` not `argIndex`. it should always be 0 because local agg has only one input ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleBase.java ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.expressions.AggregateExpression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction; +import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.AggregateInfo; +import org.apache.flink.table.planner.plan.utils.AggregateInfoList; +import org.apache.flink.table.planner.plan.utils.AggregateUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; +import scala.collection.JavaConverters; + +/** + * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan} + * which table is a {@link TableSourceTable}. And the table source in the table is a {@link + * SupportsAggregatePushDown}. + * + * <p>The aggregate push down does not support a number of more complex statements at present: + * + * <ul> + * <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS. + * <li>expressions inside the aggregation function call: such as sum(a * b). + * <li>aggregations with ordering. + * <li>aggregations with filter. + * </ul> + */ +public abstract class PushLocalAggIntoTableSourceScanRuleBase extends RelOptRule { + + public PushLocalAggIntoTableSourceScanRuleBase(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + protected boolean isMatch( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase aggregate, + BatchPhysicalTableSourceScan tableSourceScan) { + TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig(); + if (!tableConfig + .getConfiguration() + .getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) { + return false; + } + + if (aggregate.isFinal() || aggregate.getAggCallList().size() < 1) { + return false; + } + List<AggregateCall> aggCallList = + JavaConverters.seqAsJavaListConverter(aggregate.getAggCallList()).asJava(); + for (AggregateCall aggCall : aggCallList) { + if (aggCall.isDistinct() + || aggCall.isApproximate() + || aggCall.getArgList().size() > 1 + || aggCall.hasFilter() + || !aggCall.getCollation().getFieldCollations().isEmpty()) { Review comment: ` aggCall.getCollation().getFieldCollations().isEmpty()` ? ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleBase.java ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.expressions.AggregateExpression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction; +import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.AggregateInfo; +import org.apache.flink.table.planner.plan.utils.AggregateInfoList; +import org.apache.flink.table.planner.plan.utils.AggregateUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; +import scala.collection.JavaConverters; + +/** + * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan} + * which table is a {@link TableSourceTable}. And the table source in the table is a {@link + * SupportsAggregatePushDown}. + * + * <p>The aggregate push down does not support a number of more complex statements at present: + * + * <ul> + * <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS. + * <li>expressions inside the aggregation function call: such as sum(a * b). + * <li>aggregations with ordering. + * <li>aggregations with filter. + * </ul> + */ +public abstract class PushLocalAggIntoTableSourceScanRuleBase extends RelOptRule { + + public PushLocalAggIntoTableSourceScanRuleBase(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + protected boolean isMatch( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase aggregate, + BatchPhysicalTableSourceScan tableSourceScan) { + TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig(); + if (!tableConfig + .getConfiguration() + .getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) { + return false; + } + + if (aggregate.isFinal() || aggregate.getAggCallList().size() < 1) { + return false; + } + List<AggregateCall> aggCallList = + JavaConverters.seqAsJavaListConverter(aggregate.getAggCallList()).asJava(); + for (AggregateCall aggCall : aggCallList) { + if (aggCall.isDistinct() + || aggCall.isApproximate() + || aggCall.getArgList().size() > 1 + || aggCall.hasFilter() + || !aggCall.getCollation().getFieldCollations().isEmpty()) { + return false; + } + } + TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable(); + // we can not push aggregates twice + return tableSourceTable != null + && tableSourceTable.tableSource() instanceof SupportsAggregatePushDown + && Arrays.stream(tableSourceTable.abilitySpecs()) + .noneMatch(spec -> spec instanceof AggregatePushDownSpec); + } + + protected void pushLocalAggregateIntoScan( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase localAgg, + BatchPhysicalTableSourceScan oldScan) { + RelDataType originalInputRowType = oldScan.deriveRowType(); + AggregateInfoList aggInfoList = + AggregateUtil.transformToBatchAggregateInfoList( + FlinkTypeFactory.toLogicalRowType(localAgg.getInput().getRowType()), + localAgg.getAggCallList(), + null, + null); + if (aggInfoList.aggInfos().length == 0) { + // no agg function need to be pushed down + return; + } + + List<int[]> groupingSets = Collections.singletonList(localAgg.grouping()); Review comment: we should also consider `auxGrouping` here ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggWithSortIntoTableSourceScanRule.java ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalSortAggregate; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSort; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; + +import org.apache.calcite.plan.RelOptRuleCall; + +/** + * Planner rule that tries to push a local sort aggregate which with sort into a {@link + * BatchPhysicalTableSourceScan} which table is a {@link TableSourceTable}. And the table source in + * the table is a {@link SupportsAggregatePushDown}. + * + * <p>When the {@code OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED} is + * true, we have the original physical plan: + * + * <pre>{@code + * BatchPhysicalSortAggregate (global) + * +- Sort (exists if group keys are not empty) + * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton) + * +- BatchPhysicalLocalSortAggregate (local) + * +- Sort (exists if group keys are not empty) + * +- BatchPhysicalTableSourceScan + * }</pre> + * + * <p>This physical plan will be rewritten to: + * + * <pre>{@code + * BatchPhysicalSortAggregate (global) + * +- Sort (exists if group keys are not empty) + * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton) + * +- BatchPhysicalTableSourceScan (with local aggregate pushed down) + * }</pre> + */ +public class PushLocalAggWithSortIntoTableSourceScanRule Review comment: rename to PushLocalSortAggWithSortIntoScanRule? which is more clear ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleBase.java ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.expressions.AggregateExpression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction; +import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.AggregateInfo; +import org.apache.flink.table.planner.plan.utils.AggregateInfoList; +import org.apache.flink.table.planner.plan.utils.AggregateUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; +import scala.collection.JavaConverters; + +/** + * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan} + * which table is a {@link TableSourceTable}. And the table source in the table is a {@link + * SupportsAggregatePushDown}. + * + * <p>The aggregate push down does not support a number of more complex statements at present: + * + * <ul> + * <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS. + * <li>expressions inside the aggregation function call: such as sum(a * b). + * <li>aggregations with ordering. + * <li>aggregations with filter. + * </ul> + */ +public abstract class PushLocalAggIntoTableSourceScanRuleBase extends RelOptRule { + + public PushLocalAggIntoTableSourceScanRuleBase(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + protected boolean isMatch( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase aggregate, + BatchPhysicalTableSourceScan tableSourceScan) { + TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig(); + if (!tableConfig + .getConfiguration() + .getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) { + return false; + } + + if (aggregate.isFinal() || aggregate.getAggCallList().size() < 1) { Review comment: nit: aggregate.getAggCallList().isEmpty() ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleBase.java ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.expressions.AggregateExpression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction; +import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.AggregateInfo; +import org.apache.flink.table.planner.plan.utils.AggregateInfoList; +import org.apache.flink.table.planner.plan.utils.AggregateUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; +import scala.collection.JavaConverters; + +/** + * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan} + * which table is a {@link TableSourceTable}. And the table source in the table is a {@link + * SupportsAggregatePushDown}. + * + * <p>The aggregate push down does not support a number of more complex statements at present: + * + * <ul> + * <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS. + * <li>expressions inside the aggregation function call: such as sum(a * b). + * <li>aggregations with ordering. + * <li>aggregations with filter. + * </ul> + */ +public abstract class PushLocalAggIntoTableSourceScanRuleBase extends RelOptRule { + + public PushLocalAggIntoTableSourceScanRuleBase(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + protected boolean isMatch( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase aggregate, + BatchPhysicalTableSourceScan tableSourceScan) { + TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig(); + if (!tableConfig + .getConfiguration() + .getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) { + return false; + } + + if (aggregate.isFinal() || aggregate.getAggCallList().size() < 1) { + return false; + } + List<AggregateCall> aggCallList = + JavaConverters.seqAsJavaListConverter(aggregate.getAggCallList()).asJava(); Review comment: nit: use `JavaScalaConversionUtil.toJava(aggregate.getAggCallList())` instead ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleBase.java ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.expressions.AggregateExpression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction; +import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.AggregateInfo; +import org.apache.flink.table.planner.plan.utils.AggregateInfoList; +import org.apache.flink.table.planner.plan.utils.AggregateUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; +import scala.collection.JavaConverters; + +/** + * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan} + * which table is a {@link TableSourceTable}. And the table source in the table is a {@link + * SupportsAggregatePushDown}. + * + * <p>The aggregate push down does not support a number of more complex statements at present: + * + * <ul> + * <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS. + * <li>expressions inside the aggregation function call: such as sum(a * b). + * <li>aggregations with ordering. + * <li>aggregations with filter. + * </ul> + */ +public abstract class PushLocalAggIntoTableSourceScanRuleBase extends RelOptRule { + + public PushLocalAggIntoTableSourceScanRuleBase(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + protected boolean isMatch( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase aggregate, + BatchPhysicalTableSourceScan tableSourceScan) { + TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig(); + if (!tableConfig + .getConfiguration() + .getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) { + return false; + } + + if (aggregate.isFinal() || aggregate.getAggCallList().size() < 1) { + return false; + } + List<AggregateCall> aggCallList = + JavaConverters.seqAsJavaListConverter(aggregate.getAggCallList()).asJava(); + for (AggregateCall aggCall : aggCallList) { + if (aggCall.isDistinct() + || aggCall.isApproximate() + || aggCall.getArgList().size() > 1 + || aggCall.hasFilter() + || !aggCall.getCollation().getFieldCollations().isEmpty()) { + return false; + } + } + TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable(); + // we can not push aggregates twice + return tableSourceTable != null + && tableSourceTable.tableSource() instanceof SupportsAggregatePushDown + && Arrays.stream(tableSourceTable.abilitySpecs()) + .noneMatch(spec -> spec instanceof AggregatePushDownSpec); + } + + protected void pushLocalAggregateIntoScan( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase localAgg, + BatchPhysicalTableSourceScan oldScan) { + RelDataType originalInputRowType = oldScan.deriveRowType(); Review comment: please use public method: `getRowType` instead of protected method `deriveRowType()` ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggWithoutSortIntoTableSourceScanRule.java ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; + +import org.apache.calcite.plan.RelOptRuleCall; + +/** + * Planner rule that tries to push a local hash or sort aggregate which without sort into a {@link + * BatchPhysicalTableSourceScan} which table is a {@link TableSourceTable}. And the table source in + * the table is a {@link SupportsAggregatePushDown}. + * + * <p>When the {@code OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED} is + * true, we have the original physical plan: + * + * <pre>{@code + * BatchPhysicalHashAggregate (global) + * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton) + * +- BatchPhysicalGroupAggregateBase (local) + * +- BatchPhysicalTableSourceScan + * }</pre> + * + * <p>This physical plan will be rewritten to: + * + * <pre>{@code + * BatchPhysicalHashAggregate (global) + * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton) + * +- BatchPhysicalTableSourceScan (with local aggregate pushed down) + * }</pre> + */ +public class PushLocalAggWithoutSortIntoTableSourceScanRule Review comment: It's better we can split this rule into two rules: PushLocalSortAggWithoutSortIntoScanRule and PushLocalHashAggIntoScanRule. which are more clear. ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleBase.java ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.expressions.AggregateExpression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction; +import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.AggregateInfo; +import org.apache.flink.table.planner.plan.utils.AggregateInfoList; +import org.apache.flink.table.planner.plan.utils.AggregateUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; +import scala.collection.JavaConverters; + +/** + * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan} + * which table is a {@link TableSourceTable}. And the table source in the table is a {@link + * SupportsAggregatePushDown}. + * + * <p>The aggregate push down does not support a number of more complex statements at present: + * + * <ul> + * <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS. + * <li>expressions inside the aggregation function call: such as sum(a * b). + * <li>aggregations with ordering. + * <li>aggregations with filter. + * </ul> + */ +public abstract class PushLocalAggIntoTableSourceScanRuleBase extends RelOptRule { + + public PushLocalAggIntoTableSourceScanRuleBase(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + protected boolean isMatch( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase aggregate, + BatchPhysicalTableSourceScan tableSourceScan) { + TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig(); + if (!tableConfig + .getConfiguration() + .getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) { + return false; + } + + if (aggregate.isFinal() || aggregate.getAggCallList().size() < 1) { + return false; + } + List<AggregateCall> aggCallList = + JavaConverters.seqAsJavaListConverter(aggregate.getAggCallList()).asJava(); + for (AggregateCall aggCall : aggCallList) { + if (aggCall.isDistinct() + || aggCall.isApproximate() + || aggCall.getArgList().size() > 1 + || aggCall.hasFilter() + || !aggCall.getCollation().getFieldCollations().isEmpty()) { + return false; + } + } + TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable(); + // we can not push aggregates twice + return tableSourceTable != null + && tableSourceTable.tableSource() instanceof SupportsAggregatePushDown + && Arrays.stream(tableSourceTable.abilitySpecs()) + .noneMatch(spec -> spec instanceof AggregatePushDownSpec); + } + + protected void pushLocalAggregateIntoScan( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase localAgg, + BatchPhysicalTableSourceScan oldScan) { + RelDataType originalInputRowType = oldScan.deriveRowType(); + AggregateInfoList aggInfoList = + AggregateUtil.transformToBatchAggregateInfoList( + FlinkTypeFactory.toLogicalRowType(localAgg.getInput().getRowType()), + localAgg.getAggCallList(), + null, + null); + if (aggInfoList.aggInfos().length == 0) { + // no agg function need to be pushed down + return; + } + + List<int[]> groupingSets = Collections.singletonList(localAgg.grouping()); + List<AggregateExpression> aggExpressions = + buildAggregateExpressions(originalInputRowType, aggInfoList); + RelDataType relDataType = localAgg.deriveRowType(); Review comment: getRowType ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggWithSortIntoTableSourceScanRule.java ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalSortAggregate; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSort; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; + +import org.apache.calcite.plan.RelOptRuleCall; + +/** + * Planner rule that tries to push a local sort aggregate which with sort into a {@link + * BatchPhysicalTableSourceScan} which table is a {@link TableSourceTable}. And the table source in + * the table is a {@link SupportsAggregatePushDown}. + * + * <p>When the {@code OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED} is Review comment: The first half of this sentence can be moved to the previous paragraph. use {@link xx} instead of {@code xx} ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.scala ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.batch.sql.agg + +import org.apache.flink.table.api.config.OptimizerConfigOptions +import org.apache.flink.table.planner.factories.TestValuesTableFactory +import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row +import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData} +import org.junit.{Before, Test} + +class LocalAggregatePushDownITCase extends BatchTestBase { Review comment: please add more tests to cover the cases mentioned in `PushLocalAggIntoTableSourceScanRuleTest` ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.utils.BatchTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; + +/** + * Test for {@link PushLocalAggWithoutSortIntoTableSourceScanRule} and {@link + * PushLocalAggWithSortIntoTableSourceScanRule}. + */ +public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase { Review comment: please add some tests to cover the following cases: 1. local sort agg (with or without sort) 2. local hash agg 3. user defined functions 4. table.optimizer.source.aggregate-pushdown-enabled is true or false 5. filter is pushed down 6. limit is pushed down ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleBase.java ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.expressions.AggregateExpression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction; +import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.AggregateInfo; +import org.apache.flink.table.planner.plan.utils.AggregateInfoList; +import org.apache.flink.table.planner.plan.utils.AggregateUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; +import scala.collection.JavaConverters; + +/** + * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan} + * which table is a {@link TableSourceTable}. And the table source in the table is a {@link + * SupportsAggregatePushDown}. + * + * <p>The aggregate push down does not support a number of more complex statements at present: + * + * <ul> + * <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS. + * <li>expressions inside the aggregation function call: such as sum(a * b). + * <li>aggregations with ordering. + * <li>aggregations with filter. + * </ul> + */ +public abstract class PushLocalAggIntoTableSourceScanRuleBase extends RelOptRule { Review comment: PushLocalAggIntoScanRuleBase ? ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleBase.java ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.expressions.AggregateExpression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction; +import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.AggregateInfo; +import org.apache.flink.table.planner.plan.utils.AggregateInfoList; +import org.apache.flink.table.planner.plan.utils.AggregateUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; +import scala.collection.JavaConverters; + +/** + * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan} + * which table is a {@link TableSourceTable}. And the table source in the table is a {@link + * SupportsAggregatePushDown}. + * + * <p>The aggregate push down does not support a number of more complex statements at present: + * + * <ul> + * <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS. + * <li>expressions inside the aggregation function call: such as sum(a * b). + * <li>aggregations with ordering. + * <li>aggregations with filter. + * </ul> + */ +public abstract class PushLocalAggIntoTableSourceScanRuleBase extends RelOptRule { + + public PushLocalAggIntoTableSourceScanRuleBase(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + protected boolean isMatch( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase aggregate, + BatchPhysicalTableSourceScan tableSourceScan) { + TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig(); + if (!tableConfig + .getConfiguration() + .getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) { + return false; + } + + if (aggregate.isFinal() || aggregate.getAggCallList().size() < 1) { + return false; + } + List<AggregateCall> aggCallList = + JavaConverters.seqAsJavaListConverter(aggregate.getAggCallList()).asJava(); + for (AggregateCall aggCall : aggCallList) { + if (aggCall.isDistinct() + || aggCall.isApproximate() + || aggCall.getArgList().size() > 1 + || aggCall.hasFilter() + || !aggCall.getCollation().getFieldCollations().isEmpty()) { + return false; + } + } + TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable(); + // we can not push aggregates twice + return tableSourceTable != null + && tableSourceTable.tableSource() instanceof SupportsAggregatePushDown + && Arrays.stream(tableSourceTable.abilitySpecs()) + .noneMatch(spec -> spec instanceof AggregatePushDownSpec); Review comment: another thing we should consider is what's the behavior of table source with multiple push downs, such as filter or limit have been pushed down before this rule. ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleBase.java ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.expressions.AggregateExpression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction; +import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.AggregateInfo; +import org.apache.flink.table.planner.plan.utils.AggregateInfoList; +import org.apache.flink.table.planner.plan.utils.AggregateUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; +import scala.collection.JavaConverters; + +/** + * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan} + * which table is a {@link TableSourceTable}. And the table source in the table is a {@link + * SupportsAggregatePushDown}. + * + * <p>The aggregate push down does not support a number of more complex statements at present: + * + * <ul> + * <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS. + * <li>expressions inside the aggregation function call: such as sum(a * b). + * <li>aggregations with ordering. + * <li>aggregations with filter. + * </ul> + */ +public abstract class PushLocalAggIntoTableSourceScanRuleBase extends RelOptRule { + + public PushLocalAggIntoTableSourceScanRuleBase(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + protected boolean isMatch( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase aggregate, + BatchPhysicalTableSourceScan tableSourceScan) { + TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig(); + if (!tableConfig + .getConfiguration() + .getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) { + return false; + } + + if (aggregate.isFinal() || aggregate.getAggCallList().size() < 1) { + return false; + } + List<AggregateCall> aggCallList = + JavaConverters.seqAsJavaListConverter(aggregate.getAggCallList()).asJava(); + for (AggregateCall aggCall : aggCallList) { + if (aggCall.isDistinct() + || aggCall.isApproximate() + || aggCall.getArgList().size() > 1 + || aggCall.hasFilter() + || !aggCall.getCollation().getFieldCollations().isEmpty()) { + return false; + } + } + TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable(); + // we can not push aggregates twice + return tableSourceTable != null + && tableSourceTable.tableSource() instanceof SupportsAggregatePushDown + && Arrays.stream(tableSourceTable.abilitySpecs()) + .noneMatch(spec -> spec instanceof AggregatePushDownSpec); + } + + protected void pushLocalAggregateIntoScan( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase localAgg, + BatchPhysicalTableSourceScan oldScan) { + RelDataType originalInputRowType = oldScan.deriveRowType(); + AggregateInfoList aggInfoList = + AggregateUtil.transformToBatchAggregateInfoList( + FlinkTypeFactory.toLogicalRowType(localAgg.getInput().getRowType()), + localAgg.getAggCallList(), + null, + null); + if (aggInfoList.aggInfos().length == 0) { + // no agg function need to be pushed down + return; + } + + List<int[]> groupingSets = Collections.singletonList(localAgg.grouping()); + List<AggregateExpression> aggExpressions = + buildAggregateExpressions(originalInputRowType, aggInfoList); + RelDataType relDataType = localAgg.deriveRowType(); + + TableSourceTable oldTableSourceTable = oldScan.tableSourceTable(); + DynamicTableSource newTableSource = oldScan.tableSource().copy(); + + boolean isPushDownSuccess = + AggregatePushDownSpec.apply( + groupingSets, + aggExpressions, + (RowType) FlinkTypeFactory.toLogicalType(relDataType), + newTableSource); + + if (!isPushDownSuccess) { + // aggregate push down failed, just return without changing any nodes. + return; + } + + FlinkStatistic newFlinkStatistic = getNewFlinkStatistic(oldTableSourceTable); + String[] newExtraDigests = + getNewExtraDigests(originalInputRowType, localAgg.grouping(), aggExpressions); + AggregatePushDownSpec aggregatePushDownSpec = + new AggregatePushDownSpec( + groupingSets, + aggExpressions, + (RowType) FlinkTypeFactory.toLogicalType(relDataType)); + TableSourceTable newTableSourceTable = + oldTableSourceTable + .copy( + newTableSource, + newFlinkStatistic, + newExtraDigests, + new SourceAbilitySpec[] {aggregatePushDownSpec}) + .copy(relDataType); + BatchPhysicalTableSourceScan newScan = + oldScan.copy(oldScan.getTraitSet(), newTableSourceTable); + BatchPhysicalExchange oldExchange = call.rel(0); + BatchPhysicalExchange newExchange = + oldExchange.copy(oldExchange.getTraitSet(), newScan, oldExchange.getDistribution()); + call.transformTo(newExchange); + } + + private List<AggregateExpression> buildAggregateExpressions( + RelDataType originalInputRowType, AggregateInfoList aggInfoList) { + List<AggregateExpression> aggExpressions = new ArrayList<>(); + for (AggregateInfo aggInfo : aggInfoList.aggInfos()) { + List<FieldReferenceExpression> arguments = new ArrayList<>(1); + for (int argIndex : aggInfo.argIndexes()) { + DataType argType = + TypeConversions.fromLogicalToDataType( + FlinkTypeFactory.toLogicalType( + originalInputRowType + .getFieldList() + .get(argIndex) + .getType())); + FieldReferenceExpression field = + new FieldReferenceExpression( + originalInputRowType.getFieldNames().get(argIndex), + argType, + argIndex, + argIndex); + arguments.add(field); + } + if (aggInfo.function() instanceof AvgAggFunction) { + Tuple2<Sum0AggFunction, CountAggFunction> sum0AndCountFunction = + AggregateUtil.deriveSumAndCountFromAvg(aggInfo.function()); + AggregateExpression sum0Expression = + new AggregateExpression( + sum0AndCountFunction._1(), + arguments, + null, + aggInfo.externalResultType(), + aggInfo.agg().isDistinct(), + aggInfo.agg().isApproximate(), + aggInfo.agg().ignoreNulls()); + aggExpressions.add(sum0Expression); + AggregateExpression countExpression = + new AggregateExpression( + sum0AndCountFunction._2(), + arguments, + null, + aggInfo.externalResultType(), + aggInfo.agg().isDistinct(), + aggInfo.agg().isApproximate(), + aggInfo.agg().ignoreNulls()); + aggExpressions.add(countExpression); + } else { + AggregateExpression aggregateExpression = + new AggregateExpression( + aggInfo.function(), + arguments, + null, + aggInfo.externalResultType(), + aggInfo.agg().isDistinct(), + aggInfo.agg().isApproximate(), + aggInfo.agg().ignoreNulls()); + aggExpressions.add(aggregateExpression); + } + } + return aggExpressions; + } + + private FlinkStatistic getNewFlinkStatistic(TableSourceTable tableSourceTable) { + FlinkStatistic oldStatistic = tableSourceTable.getStatistic(); + FlinkStatistic newStatistic; + if (oldStatistic == FlinkStatistic.UNKNOWN()) { + newStatistic = oldStatistic; + } else { + // Remove tableStats after all of aggregate have been pushed down + newStatistic = + FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build(); + } + return newStatistic; + } + + private String[] getNewExtraDigests( + RelDataType originalInputRowType, + int[] grouping, + List<AggregateExpression> aggregateExpressions) { + String extraDigest; + String groupingStr = "null"; + if (grouping.length > 0) { + groupingStr = + Arrays.stream(grouping) + .mapToObj(index -> originalInputRowType.getFieldNames().get(index)) + .collect(Collectors.joining(",")); + } + String aggFunctionsStr = "null"; + if (aggregateExpressions.size() > 0) { + aggFunctionsStr = + aggregateExpressions.stream() + .map(AggregateExpression::asSummaryString) + .collect(Collectors.joining(",")); + } + extraDigest = + "aggregates=[grouping=[" + + groupingStr Review comment: it's a little strange about `grouping=[null]` when grouping is empty. `grouping=[]` looks better ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala ########## @@ -274,6 +276,22 @@ object AggregateUtil extends Enumeration { needDistinctInfo = true) } + def deriveSumAndCountFromAvg( + avgAggFunction: UserDefinedFunction): (Sum0AggFunction, CountAggFunction) = { + avgAggFunction match { + case _: ByteAvgAggFunction => (new ByteSum0AggFunction, new CountAggFunction) + case _: ShortAvgAggFunction => (new ShortSum0AggFunction, new CountAggFunction) + case _: IntAvgAggFunction => (new IntSum0AggFunction, new CountAggFunction) + case _: LongAvgAggFunction => (new LongSum0AggFunction, new CountAggFunction) + case _: FloatAvgAggFunction => (new FloatSum0AggFunction, new CountAggFunction) + case _: DoubleAvgAggFunction => (new DoubleSum0AggFunction, new CountAggFunction) + case _ => { Review comment: nit: redundant bracket ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java ########## @@ -816,32 +833,115 @@ public String asSummaryString() { } protected Collection<RowData> convertToRowData(DataStructureConverter converter) { - List<RowData> result = new ArrayList<>(); + List<Row> resultBuffer = new ArrayList<>(); List<Map<String, String>> keys = allPartitions.isEmpty() ? Collections.singletonList(Collections.emptyMap()) : allPartitions; int numRetained = 0; + boolean overLimit = false; for (Map<String, String> partition : keys) { for (Row row : data.get(partition)) { - if (result.size() >= limit) { - return result; + if (resultBuffer.size() >= limit) { + overLimit = true; + break; } boolean isRetained = FilterUtils.isRetainedAfterApplyingFilterPredicates( filterPredicates, getValueGetter(row)); if (isRetained) { final Row projectedRow = projectRow(row); - final RowData rowData = (RowData) converter.toInternal(projectedRow); - if (rowData != null) { - if (numRetained >= numElementToSkip) { - rowData.setRowKind(row.getKind()); - result.add(rowData); - } - numRetained++; - } + resultBuffer.add(projectedRow); + } + } + if (overLimit) { + break; + } + } + // simulate aggregate operation + if (!aggregateExpressions.isEmpty()) { + resultBuffer = applyAggregatesToRows(resultBuffer); + } + List<RowData> result = new ArrayList<>(); + for (Row row : resultBuffer) { + final RowData rowData = (RowData) converter.toInternal(row); + if (rowData != null) { + if (numRetained >= numElementToSkip) { + rowData.setRowKind(row.getKind()); + result.add(rowData); + } + numRetained++; + } + } + return result; + } + + private List<Row> applyAggregatesToRows(List<Row> rows) { + if (groupingSet != null && groupingSet.length > 0) { + // has group by, group firstly + Map<Row, List<Row>> buffer = new HashMap<>(); + for (Row row : rows) { + Row bufferKey = new Row(groupingSet.length); + for (int i = 0; i < groupingSet.length; i++) { + bufferKey.setField(i, row.getField(groupingSet[i])); + } + if (buffer.containsKey(bufferKey)) { + buffer.get(bufferKey).add(row); + } else { + buffer.put(bufferKey, new ArrayList<>(Collections.singletonList(row))); } } + List<Row> result = new ArrayList<>(); + for (Map.Entry<Row, List<Row>> entry : buffer.entrySet()) { + result.add(Row.join(entry.getKey(), accumulateRows(entry.getValue()))); + } + return result; + } else { + return Collections.singletonList(accumulateRows(rows)); + } + } + + // can only apply sum/sum0/avg function for long type fields for testing + private Row accumulateRows(List<Row> rows) { Review comment: add limitation for`applyAggregates` method which can only support long aggregations ? ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/abilities/source/AggregatePushDownSpec.java ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.abilities.source; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.expressions.AggregateExpression; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A sub-class of {@link SourceAbilitySpec} that can not only serialize/deserialize the aggregation + * to/from JSON, but also can push the filter into a {@link SupportsAggregatePushDown}. + */ +@JsonTypeName("AggregatePushDown") +public class AggregatePushDownSpec extends SourceAbilitySpecBase { + + public static final String FIELD_NAME_GROUPING_SETS = "groupingSets"; + + public static final String FIELD_NAME_AGGREGATE_EXPRESSIONS = "aggregateExpressions"; + + @JsonProperty(FIELD_NAME_GROUPING_SETS) + private final List<int[]> groupingSets; + + @JsonProperty(FIELD_NAME_AGGREGATE_EXPRESSIONS) + private final List<AggregateExpression> aggregateExpressions; Review comment: `AggregateExpression` can not be serialized to json, please use AggregateCall here. ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleBase.java ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.expressions.AggregateExpression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction; +import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.AggregateInfo; +import org.apache.flink.table.planner.plan.utils.AggregateInfoList; +import org.apache.flink.table.planner.plan.utils.AggregateUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; +import scala.collection.JavaConverters; + +/** + * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan} + * which table is a {@link TableSourceTable}. And the table source in the table is a {@link + * SupportsAggregatePushDown}. + * + * <p>The aggregate push down does not support a number of more complex statements at present: + * + * <ul> + * <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS. + * <li>expressions inside the aggregation function call: such as sum(a * b). + * <li>aggregations with ordering. + * <li>aggregations with filter. + * </ul> + */ +public abstract class PushLocalAggIntoTableSourceScanRuleBase extends RelOptRule { + + public PushLocalAggIntoTableSourceScanRuleBase(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + protected boolean isMatch( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase aggregate, + BatchPhysicalTableSourceScan tableSourceScan) { + TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig(); + if (!tableConfig + .getConfiguration() + .getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) { + return false; + } + + if (aggregate.isFinal() || aggregate.getAggCallList().size() < 1) { + return false; + } + List<AggregateCall> aggCallList = + JavaConverters.seqAsJavaListConverter(aggregate.getAggCallList()).asJava(); + for (AggregateCall aggCall : aggCallList) { + if (aggCall.isDistinct() + || aggCall.isApproximate() + || aggCall.getArgList().size() > 1 + || aggCall.hasFilter() + || !aggCall.getCollation().getFieldCollations().isEmpty()) { + return false; + } + } + TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable(); + // we can not push aggregates twice + return tableSourceTable != null + && tableSourceTable.tableSource() instanceof SupportsAggregatePushDown + && Arrays.stream(tableSourceTable.abilitySpecs()) + .noneMatch(spec -> spec instanceof AggregatePushDownSpec); + } + + protected void pushLocalAggregateIntoScan( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase localAgg, + BatchPhysicalTableSourceScan oldScan) { + RelDataType originalInputRowType = oldScan.deriveRowType(); + AggregateInfoList aggInfoList = + AggregateUtil.transformToBatchAggregateInfoList( + FlinkTypeFactory.toLogicalRowType(localAgg.getInput().getRowType()), + localAgg.getAggCallList(), + null, + null); + if (aggInfoList.aggInfos().length == 0) { + // no agg function need to be pushed down + return; + } + + List<int[]> groupingSets = Collections.singletonList(localAgg.grouping()); + List<AggregateExpression> aggExpressions = + buildAggregateExpressions(originalInputRowType, aggInfoList); + RelDataType relDataType = localAgg.deriveRowType(); + + TableSourceTable oldTableSourceTable = oldScan.tableSourceTable(); + DynamicTableSource newTableSource = oldScan.tableSource().copy(); + + boolean isPushDownSuccess = + AggregatePushDownSpec.apply( + groupingSets, + aggExpressions, + (RowType) FlinkTypeFactory.toLogicalType(relDataType), + newTableSource); + + if (!isPushDownSuccess) { + // aggregate push down failed, just return without changing any nodes. + return; + } + + FlinkStatistic newFlinkStatistic = getNewFlinkStatistic(oldTableSourceTable); + String[] newExtraDigests = + getNewExtraDigests(originalInputRowType, localAgg.grouping(), aggExpressions); + AggregatePushDownSpec aggregatePushDownSpec = + new AggregatePushDownSpec( + groupingSets, + aggExpressions, + (RowType) FlinkTypeFactory.toLogicalType(relDataType)); + TableSourceTable newTableSourceTable = + oldTableSourceTable + .copy( + newTableSource, + newFlinkStatistic, + newExtraDigests, + new SourceAbilitySpec[] {aggregatePushDownSpec}) + .copy(relDataType); + BatchPhysicalTableSourceScan newScan = + oldScan.copy(oldScan.getTraitSet(), newTableSourceTable); + BatchPhysicalExchange oldExchange = call.rel(0); + BatchPhysicalExchange newExchange = + oldExchange.copy(oldExchange.getTraitSet(), newScan, oldExchange.getDistribution()); + call.transformTo(newExchange); + } + + private List<AggregateExpression> buildAggregateExpressions( + RelDataType originalInputRowType, AggregateInfoList aggInfoList) { + List<AggregateExpression> aggExpressions = new ArrayList<>(); + for (AggregateInfo aggInfo : aggInfoList.aggInfos()) { + List<FieldReferenceExpression> arguments = new ArrayList<>(1); + for (int argIndex : aggInfo.argIndexes()) { + DataType argType = + TypeConversions.fromLogicalToDataType( + FlinkTypeFactory.toLogicalType( + originalInputRowType + .getFieldList() + .get(argIndex) + .getType())); + FieldReferenceExpression field = + new FieldReferenceExpression( + originalInputRowType.getFieldNames().get(argIndex), + argType, + argIndex, + argIndex); + arguments.add(field); + } + if (aggInfo.function() instanceof AvgAggFunction) { + Tuple2<Sum0AggFunction, CountAggFunction> sum0AndCountFunction = + AggregateUtil.deriveSumAndCountFromAvg(aggInfo.function()); + AggregateExpression sum0Expression = + new AggregateExpression( + sum0AndCountFunction._1(), + arguments, + null, + aggInfo.externalResultType(), + aggInfo.agg().isDistinct(), + aggInfo.agg().isApproximate(), + aggInfo.agg().ignoreNulls()); + aggExpressions.add(sum0Expression); + AggregateExpression countExpression = + new AggregateExpression( + sum0AndCountFunction._2(), + arguments, + null, + aggInfo.externalResultType(), + aggInfo.agg().isDistinct(), + aggInfo.agg().isApproximate(), + aggInfo.agg().ignoreNulls()); + aggExpressions.add(countExpression); + } else { + AggregateExpression aggregateExpression = + new AggregateExpression( + aggInfo.function(), + arguments, + null, + aggInfo.externalResultType(), + aggInfo.agg().isDistinct(), + aggInfo.agg().isApproximate(), + aggInfo.agg().ignoreNulls()); + aggExpressions.add(aggregateExpression); + } + } + return aggExpressions; + } + + private FlinkStatistic getNewFlinkStatistic(TableSourceTable tableSourceTable) { + FlinkStatistic oldStatistic = tableSourceTable.getStatistic(); + FlinkStatistic newStatistic; + if (oldStatistic == FlinkStatistic.UNKNOWN()) { + newStatistic = oldStatistic; + } else { + // Remove tableStats after all of aggregate have been pushed down + newStatistic = + FlinkStatistic.builder().statistic(oldStatistic).tableStats(null).build(); + } + return newStatistic; + } + + private String[] getNewExtraDigests( + RelDataType originalInputRowType, + int[] grouping, + List<AggregateExpression> aggregateExpressions) { + String extraDigest; + String groupingStr = "null"; + if (grouping.length > 0) { + groupingStr = + Arrays.stream(grouping) + .mapToObj(index -> originalInputRowType.getFieldNames().get(index)) + .collect(Collectors.joining(",")); + } + String aggFunctionsStr = "null"; + if (aggregateExpressions.size() > 0) { Review comment: we can remove this `if` because aggregateExpressions should not be empty. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
