godfreyhe commented on a change in pull request #17344: URL: https://github.com/apache/flink/pull/17344#discussion_r737986628
########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/AggregatePushDownSpec.java ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.abilities.source; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.expressions.AggregateExpression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction; +import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction; +import org.apache.flink.table.planner.plan.utils.AggregateInfo; +import org.apache.flink.table.planner.plan.utils.AggregateInfoList; +import org.apache.flink.table.planner.plan.utils.AggregateUtil; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; + +import org.apache.calcite.rel.core.AggregateCall; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A sub-class of {@link SourceAbilitySpec} that can not only serialize/deserialize the aggregation + * to/from JSON, but also can push the local aggregate into a {@link SupportsAggregatePushDown}. + */ +@JsonTypeName("AggregatePushDown") +public class AggregatePushDownSpec extends SourceAbilitySpecBase { + + public static final String FIELD_NAME_INPUT_TYPE = "inputType"; + + public static final String FIELD_NAME_GROUPING_SETS = "groupingSets"; + + public static final String FIELD_NAME_AGGREGATE_CALLS = "aggregateCalls"; + + @JsonProperty(FIELD_NAME_INPUT_TYPE) + private final RowType inputType; + + @JsonProperty(FIELD_NAME_GROUPING_SETS) + private final List<int[]> groupingSets; + + @JsonProperty(FIELD_NAME_AGGREGATE_CALLS) + private final List<AggregateCall> aggregateCalls; + + @JsonCreator + public AggregatePushDownSpec( + @JsonProperty(FIELD_NAME_INPUT_TYPE) RowType inputType, + @JsonProperty(FIELD_NAME_GROUPING_SETS) List<int[]> groupingSets, + @JsonProperty(FIELD_NAME_AGGREGATE_CALLS) List<AggregateCall> aggregateCalls, + @JsonProperty(FIELD_NAME_PRODUCED_TYPE) RowType producedType) { + super(producedType); + + this.inputType = inputType; + this.groupingSets = new ArrayList<>(checkNotNull(groupingSets)); + this.aggregateCalls = aggregateCalls; + } + + @Override + public void apply(DynamicTableSource tableSource, SourceAbilityContext context) { + checkArgument(getProducedType().isPresent()); + apply( + inputType, + groupingSets, + aggregateCalls, + getProducedType().get(), + tableSource, + context); + } + + @Override + public String getDigests(SourceAbilityContext context) { + int[] grouping = groupingSets.get(0); + String groupingStr = + Arrays.stream(grouping) + .mapToObj(index -> inputType.getFieldNames().get(index)) + .collect(Collectors.joining(",")); + + List<AggregateExpression> aggregateExpressions = + buildAggregateExpressions(inputType, aggregateCalls); + String aggFunctionsStr = + aggregateExpressions.stream() + .map(AggregateExpression::asSummaryString) + .collect(Collectors.joining(",")); + + return "aggregates=[grouping=[" + + groupingStr + + "], aggFunctions=[" + + aggFunctionsStr + + "]]"; + } + + public static boolean apply( + RowType inputType, + List<int[]> groupingSets, + List<AggregateCall> aggregateCalls, + RowType producedType, + DynamicTableSource tableSource, + SourceAbilityContext context) { + assert context.isBatchMode(); Review comment: add `assert groupingSets.size() == 1` ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoScanRuleBase.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexSlot; +import org.apache.commons.lang3.ArrayUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan} + * whose table is a {@link TableSourceTable} with a source supporting {@link + * SupportsAggregatePushDown}. + * + * <p>The aggregate push down does not support a number of more complex statements at present: + * + * <ul> + * <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS. + * <li>expressions inside the aggregation function call: such as sum(a * b). + * <li>aggregations with ordering. + * <li>aggregations with filter. + * </ul> + */ +public abstract class PushLocalAggIntoScanRuleBase extends RelOptRule { + + public PushLocalAggIntoScanRuleBase(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + protected boolean checkMatchesAggregatePushDown( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase aggregate, + BatchPhysicalTableSourceScan tableSourceScan) { + TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig(); + if (!tableConfig + .getConfiguration() + .getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) { + return false; + } + + if (aggregate.isFinal() || aggregate.getAggCallList().isEmpty()) { + return false; + } + List<AggregateCall> aggCallList = + JavaScalaConversionUtil.toJava(aggregate.getAggCallList()); + for (AggregateCall aggCall : aggCallList) { + if (aggCall.isDistinct() + || aggCall.isApproximate() + || aggCall.getArgList().size() > 1 + || aggCall.hasFilter() + || !aggCall.getCollation().getFieldCollations().isEmpty()) { + return false; + } + } + TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable(); + // we can not push aggregates twice + return tableSourceTable != null + && tableSourceTable.tableSource() instanceof SupportsAggregatePushDown + && Arrays.stream(tableSourceTable.abilitySpecs()) + .noneMatch(spec -> spec instanceof AggregatePushDownSpec); + } + + protected void pushLocalAggregateIntoScan( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase localAgg, + BatchPhysicalTableSourceScan oldScan) { + pushLocalAggregateIntoScan(call, localAgg, oldScan, null); + } + + protected void pushLocalAggregateIntoScan( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase localAgg, + BatchPhysicalTableSourceScan oldScan, + int[] calcRefFields) { + RowType inputType = FlinkTypeFactory.toLogicalRowType(oldScan.getRowType()); + List<int[]> groupingSets = + Collections.singletonList( + ArrayUtils.addAll(localAgg.grouping(), localAgg.auxGrouping())); + List<AggregateCall> aggCallList = JavaScalaConversionUtil.toJava(localAgg.getAggCallList()); + + // map arg index in aggregate to field index in scan through referred fields by calc. + if (calcRefFields != null) { + groupingSets = translateGroupingArgIndex(groupingSets, calcRefFields); + aggCallList = translateAggCallArgIndex(aggCallList, calcRefFields); + } + + RowType producedType = FlinkTypeFactory.toLogicalRowType(localAgg.getRowType()); + + TableSourceTable oldTableSourceTable = oldScan.tableSourceTable(); + DynamicTableSource newTableSource = oldScan.tableSource().copy(); + + boolean isPushDownSuccess = + AggregatePushDownSpec.apply( + inputType, + groupingSets, + aggCallList, + producedType, + newTableSource, + SourceAbilityContext.from(oldScan)); + + if (!isPushDownSuccess) { + // aggregate push down failed, just return without changing any nodes. + return; + } + + // create new source table with new spec and statistic. + AggregatePushDownSpec aggregatePushDownSpec = + new AggregatePushDownSpec(inputType, groupingSets, aggCallList, producedType); + + Set<String> groupColumns = + Arrays.stream(groupingSets.get(0)) + .boxed() + .map(idx -> inputType.getFieldNames().get(idx)) + .collect(Collectors.toSet()); + FlinkStatistic newFlinkStatistic = getNewFlinkStatistic(oldTableSourceTable, groupColumns); + + TableSourceTable newTableSourceTable = + oldTableSourceTable + .copy( + newTableSource, + localAgg.getRowType(), + new SourceAbilitySpec[] {aggregatePushDownSpec}) + .copy(newFlinkStatistic); + + // transform to new nodes. + BatchPhysicalTableSourceScan newScan = + oldScan.copy(oldScan.getTraitSet(), newTableSourceTable); + BatchPhysicalExchange oldExchange = call.rel(0); + BatchPhysicalExchange newExchange = + oldExchange.copy(oldExchange.getTraitSet(), newScan, oldExchange.getDistribution()); + call.transformTo(newExchange); + } + + private FlinkStatistic getNewFlinkStatistic( + TableSourceTable tableSourceTable, Set<String> groupColumns) { + FlinkStatistic oldStatistic = tableSourceTable.getStatistic(); + + // Create new unique keys if there are group columns + Set<Set<String>> uniqueKeys = null; + if (!groupColumns.isEmpty()) { + uniqueKeys = new HashSet<>(); + uniqueKeys.add(groupColumns); + } + + // Remove tableStats after all aggregates have been pushed down + return FlinkStatistic.builder() + .statistic(oldStatistic) + .uniqueKeys(uniqueKeys) + .tableStats(null) + .build(); + } + + protected boolean checkNoProjectionPushDown(BatchPhysicalTableSourceScan tableSourceScan) { + TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable(); + return tableSourceTable != null + && Arrays.stream(tableSourceTable.abilitySpecs()) + .noneMatch(spec -> spec instanceof ProjectPushDownSpec); + } + + /** + * Currently, we only supports to push down aggregate above calc which has input ref only. + * + * @param calc BatchPhysicalCalc + * @return true if OK to be pushed down + */ + protected boolean checkCalcInputRefOnly(BatchPhysicalCalc calc) { Review comment: isInputRefOnly ? ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.batch.sql.agg; + +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.runtime.utils.BatchTestBase; +import org.apache.flink.table.planner.runtime.utils.TestData; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.types.Row; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; + +/** Test for local aggregate push down. */ +public class LocalAggregatePushDownITCase extends BatchTestBase { + + @Before + public void before() { + super.before(); + env().setParallelism(1); // set sink parallelism to 1 + + String testDataId = TestValuesTableFactory.registerData(TestData.personData()); + String ddl = + "CREATE TABLE AggregatableTable (\n" + + " id int,\n" + + " age int,\n" + + " name string,\n" + + " height int,\n" + + " gender string,\n" + + " deposit bigint,\n" + + " points bigint,\n" + + " metadata_1 BIGINT METADATA,\n" + + " metadata_2 STRING METADATA\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'data-id' = '" + + testDataId + + "',\n" + + " 'filterable-fields' = 'id;age',\n" + + " 'readable-metadata' = 'metadata_1:BIGINT, metadata_2:STRING',\n" + + " 'bounded' = 'true'\n" + + ")"; + tEnv().executeSql(ddl); + + // partitioned table + String testDataId2 = TestValuesTableFactory.registerData(TestData.personData()); + String ddl2 = + "CREATE TABLE AggregatableTable_Part (\n" + + " id int,\n" + + " age int,\n" + + " name string,\n" + + " height int,\n" + + " gender string,\n" + + " deposit bigint,\n" + + " points bigint,\n" + + " distance BIGINT,\n" + + " type STRING\n" + + ") PARTITIONED BY (type)\n" + + "WITH (\n" + + " 'connector' = 'values',\n" + + " 'data-id' = '" + + testDataId + + "',\n" + + " 'filterable-fields' = 'id;age',\n" + + " 'partition-list' = 'type:A;type:B;type:C;type:D',\n" + + " 'bounded' = 'true'\n" + + ")"; + tEnv().executeSql(ddl2); + + // partitioned table + String testDataId3 = TestValuesTableFactory.registerData(TestData.personData()); Review comment: ditto ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoScanRuleBase.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexSlot; +import org.apache.commons.lang3.ArrayUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan} + * whose table is a {@link TableSourceTable} with a source supporting {@link + * SupportsAggregatePushDown}. + * + * <p>The aggregate push down does not support a number of more complex statements at present: + * + * <ul> + * <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS. + * <li>expressions inside the aggregation function call: such as sum(a * b). + * <li>aggregations with ordering. + * <li>aggregations with filter. + * </ul> + */ +public abstract class PushLocalAggIntoScanRuleBase extends RelOptRule { + + public PushLocalAggIntoScanRuleBase(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + protected boolean checkMatchesAggregatePushDown( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase aggregate, + BatchPhysicalTableSourceScan tableSourceScan) { + TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig(); + if (!tableConfig + .getConfiguration() + .getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) { + return false; + } + + if (aggregate.isFinal() || aggregate.getAggCallList().isEmpty()) { + return false; + } + List<AggregateCall> aggCallList = + JavaScalaConversionUtil.toJava(aggregate.getAggCallList()); + for (AggregateCall aggCall : aggCallList) { + if (aggCall.isDistinct() + || aggCall.isApproximate() + || aggCall.getArgList().size() > 1 + || aggCall.hasFilter() + || !aggCall.getCollation().getFieldCollations().isEmpty()) { + return false; + } + } + TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable(); + // we can not push aggregates twice + return tableSourceTable != null + && tableSourceTable.tableSource() instanceof SupportsAggregatePushDown + && Arrays.stream(tableSourceTable.abilitySpecs()) + .noneMatch(spec -> spec instanceof AggregatePushDownSpec); + } + + protected void pushLocalAggregateIntoScan( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase localAgg, + BatchPhysicalTableSourceScan oldScan) { + pushLocalAggregateIntoScan(call, localAgg, oldScan, null); + } + + protected void pushLocalAggregateIntoScan( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase localAgg, + BatchPhysicalTableSourceScan oldScan, + int[] calcRefFields) { + RowType inputType = FlinkTypeFactory.toLogicalRowType(oldScan.getRowType()); + List<int[]> groupingSets = + Collections.singletonList( + ArrayUtils.addAll(localAgg.grouping(), localAgg.auxGrouping())); + List<AggregateCall> aggCallList = JavaScalaConversionUtil.toJava(localAgg.getAggCallList()); + + // map arg index in aggregate to field index in scan through referred fields by calc. + if (calcRefFields != null) { + groupingSets = translateGroupingArgIndex(groupingSets, calcRefFields); + aggCallList = translateAggCallArgIndex(aggCallList, calcRefFields); + } + + RowType producedType = FlinkTypeFactory.toLogicalRowType(localAgg.getRowType()); + + TableSourceTable oldTableSourceTable = oldScan.tableSourceTable(); + DynamicTableSource newTableSource = oldScan.tableSource().copy(); + + boolean isPushDownSuccess = + AggregatePushDownSpec.apply( + inputType, + groupingSets, + aggCallList, + producedType, + newTableSource, + SourceAbilityContext.from(oldScan)); + + if (!isPushDownSuccess) { + // aggregate push down failed, just return without changing any nodes. + return; + } + + // create new source table with new spec and statistic. + AggregatePushDownSpec aggregatePushDownSpec = + new AggregatePushDownSpec(inputType, groupingSets, aggCallList, producedType); + + Set<String> groupColumns = + Arrays.stream(groupingSets.get(0)) + .boxed() + .map(idx -> inputType.getFieldNames().get(idx)) + .collect(Collectors.toSet()); + FlinkStatistic newFlinkStatistic = getNewFlinkStatistic(oldTableSourceTable, groupColumns); + + TableSourceTable newTableSourceTable = + oldTableSourceTable + .copy( + newTableSource, + localAgg.getRowType(), + new SourceAbilitySpec[] {aggregatePushDownSpec}) + .copy(newFlinkStatistic); + + // transform to new nodes. + BatchPhysicalTableSourceScan newScan = + oldScan.copy(oldScan.getTraitSet(), newTableSourceTable); + BatchPhysicalExchange oldExchange = call.rel(0); + BatchPhysicalExchange newExchange = + oldExchange.copy(oldExchange.getTraitSet(), newScan, oldExchange.getDistribution()); + call.transformTo(newExchange); + } + + private FlinkStatistic getNewFlinkStatistic( + TableSourceTable tableSourceTable, Set<String> groupColumns) { + FlinkStatistic oldStatistic = tableSourceTable.getStatistic(); + + // Create new unique keys if there are group columns + Set<Set<String>> uniqueKeys = null; + if (!groupColumns.isEmpty()) { + uniqueKeys = new HashSet<>(); + uniqueKeys.add(groupColumns); + } + + // Remove tableStats after all aggregates have been pushed down + return FlinkStatistic.builder() + .statistic(oldStatistic) + .uniqueKeys(uniqueKeys) + .tableStats(null) + .build(); + } + + protected boolean checkNoProjectionPushDown(BatchPhysicalTableSourceScan tableSourceScan) { Review comment: isProjectionNotPushDown ? ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java ########## @@ -286,6 +298,9 @@ private static RowKind parseRowKind(String rowKindShortString) { private static final ConfigOption<Integer> SINK_EXPECTED_MESSAGES_NUM = ConfigOptions.key("sink-expected-messages-num").intType().defaultValue(-1); + private static final ConfigOption<Boolean> DISABLE_PROJECTION_PUSH_DOWN = + ConfigOptions.key("disable-projection-push-down").booleanType().defaultValue(false); Review comment: enable-projection-push-down ? which is more intuitive ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoScanRuleBase.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexSlot; +import org.apache.commons.lang3.ArrayUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan} + * whose table is a {@link TableSourceTable} with a source supporting {@link + * SupportsAggregatePushDown}. + * + * <p>The aggregate push down does not support a number of more complex statements at present: + * + * <ul> + * <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS. + * <li>expressions inside the aggregation function call: such as sum(a * b). + * <li>aggregations with ordering. + * <li>aggregations with filter. + * </ul> + */ +public abstract class PushLocalAggIntoScanRuleBase extends RelOptRule { + + public PushLocalAggIntoScanRuleBase(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + protected boolean checkMatchesAggregatePushDown( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase aggregate, + BatchPhysicalTableSourceScan tableSourceScan) { + TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig(); + if (!tableConfig + .getConfiguration() + .getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) { + return false; + } + + if (aggregate.isFinal() || aggregate.getAggCallList().isEmpty()) { + return false; + } + List<AggregateCall> aggCallList = + JavaScalaConversionUtil.toJava(aggregate.getAggCallList()); + for (AggregateCall aggCall : aggCallList) { + if (aggCall.isDistinct() + || aggCall.isApproximate() + || aggCall.getArgList().size() > 1 + || aggCall.hasFilter() + || !aggCall.getCollation().getFieldCollations().isEmpty()) { + return false; + } + } + TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable(); + // we can not push aggregates twice + return tableSourceTable != null + && tableSourceTable.tableSource() instanceof SupportsAggregatePushDown + && Arrays.stream(tableSourceTable.abilitySpecs()) + .noneMatch(spec -> spec instanceof AggregatePushDownSpec); + } + + protected void pushLocalAggregateIntoScan( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase localAgg, + BatchPhysicalTableSourceScan oldScan) { + pushLocalAggregateIntoScan(call, localAgg, oldScan, null); + } + + protected void pushLocalAggregateIntoScan( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase localAgg, + BatchPhysicalTableSourceScan oldScan, + int[] calcRefFields) { + RowType inputType = FlinkTypeFactory.toLogicalRowType(oldScan.getRowType()); + List<int[]> groupingSets = + Collections.singletonList( + ArrayUtils.addAll(localAgg.grouping(), localAgg.auxGrouping())); + List<AggregateCall> aggCallList = JavaScalaConversionUtil.toJava(localAgg.getAggCallList()); + + // map arg index in aggregate to field index in scan through referred fields by calc. + if (calcRefFields != null) { + groupingSets = translateGroupingArgIndex(groupingSets, calcRefFields); + aggCallList = translateAggCallArgIndex(aggCallList, calcRefFields); + } + + RowType producedType = FlinkTypeFactory.toLogicalRowType(localAgg.getRowType()); + + TableSourceTable oldTableSourceTable = oldScan.tableSourceTable(); + DynamicTableSource newTableSource = oldScan.tableSource().copy(); + + boolean isPushDownSuccess = + AggregatePushDownSpec.apply( + inputType, + groupingSets, + aggCallList, + producedType, + newTableSource, + SourceAbilityContext.from(oldScan)); + + if (!isPushDownSuccess) { + // aggregate push down failed, just return without changing any nodes. + return; + } + + // create new source table with new spec and statistic. + AggregatePushDownSpec aggregatePushDownSpec = + new AggregatePushDownSpec(inputType, groupingSets, aggCallList, producedType); + + Set<String> groupColumns = + Arrays.stream(groupingSets.get(0)) + .boxed() + .map(idx -> inputType.getFieldNames().get(idx)) + .collect(Collectors.toSet()); + FlinkStatistic newFlinkStatistic = getNewFlinkStatistic(oldTableSourceTable, groupColumns); + + TableSourceTable newTableSourceTable = + oldTableSourceTable + .copy( + newTableSource, + localAgg.getRowType(), + new SourceAbilitySpec[] {aggregatePushDownSpec}) + .copy(newFlinkStatistic); + + // transform to new nodes. + BatchPhysicalTableSourceScan newScan = + oldScan.copy(oldScan.getTraitSet(), newTableSourceTable); + BatchPhysicalExchange oldExchange = call.rel(0); + BatchPhysicalExchange newExchange = + oldExchange.copy(oldExchange.getTraitSet(), newScan, oldExchange.getDistribution()); + call.transformTo(newExchange); + } + + private FlinkStatistic getNewFlinkStatistic( + TableSourceTable tableSourceTable, Set<String> groupColumns) { + FlinkStatistic oldStatistic = tableSourceTable.getStatistic(); + + // Create new unique keys if there are group columns + Set<Set<String>> uniqueKeys = null; + if (!groupColumns.isEmpty()) { + uniqueKeys = new HashSet<>(); + uniqueKeys.add(groupColumns); + } + + // Remove tableStats after all aggregates have been pushed down + return FlinkStatistic.builder() + .statistic(oldStatistic) + .uniqueKeys(uniqueKeys) Review comment: the `uniqueKeys` is not correct, because we just push down the local aggregate instead of global aggregate, the result between each parallelism may have overlap. So this method should return FlinkStatistic.UNKNOWN ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoScanRuleBase.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexSlot; +import org.apache.commons.lang3.ArrayUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan} + * whose table is a {@link TableSourceTable} with a source supporting {@link + * SupportsAggregatePushDown}. + * + * <p>The aggregate push down does not support a number of more complex statements at present: + * + * <ul> + * <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS. + * <li>expressions inside the aggregation function call: such as sum(a * b). + * <li>aggregations with ordering. + * <li>aggregations with filter. + * </ul> + */ +public abstract class PushLocalAggIntoScanRuleBase extends RelOptRule { + + public PushLocalAggIntoScanRuleBase(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + protected boolean checkMatchesAggregatePushDown( Review comment: canPushDown ? ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoScanRuleBase.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexSlot; +import org.apache.commons.lang3.ArrayUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan} + * whose table is a {@link TableSourceTable} with a source supporting {@link + * SupportsAggregatePushDown}. + * + * <p>The aggregate push down does not support a number of more complex statements at present: + * + * <ul> + * <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS. + * <li>expressions inside the aggregation function call: such as sum(a * b). + * <li>aggregations with ordering. + * <li>aggregations with filter. + * </ul> + */ +public abstract class PushLocalAggIntoScanRuleBase extends RelOptRule { + + public PushLocalAggIntoScanRuleBase(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + protected boolean checkMatchesAggregatePushDown( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase aggregate, + BatchPhysicalTableSourceScan tableSourceScan) { + TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig(); + if (!tableConfig + .getConfiguration() + .getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) { + return false; + } + + if (aggregate.isFinal() || aggregate.getAggCallList().isEmpty()) { + return false; + } + List<AggregateCall> aggCallList = + JavaScalaConversionUtil.toJava(aggregate.getAggCallList()); + for (AggregateCall aggCall : aggCallList) { + if (aggCall.isDistinct() + || aggCall.isApproximate() + || aggCall.getArgList().size() > 1 + || aggCall.hasFilter() + || !aggCall.getCollation().getFieldCollations().isEmpty()) { + return false; + } + } + TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable(); + // we can not push aggregates twice + return tableSourceTable != null + && tableSourceTable.tableSource() instanceof SupportsAggregatePushDown + && Arrays.stream(tableSourceTable.abilitySpecs()) + .noneMatch(spec -> spec instanceof AggregatePushDownSpec); + } + + protected void pushLocalAggregateIntoScan( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase localAgg, + BatchPhysicalTableSourceScan oldScan) { + pushLocalAggregateIntoScan(call, localAgg, oldScan, null); + } + + protected void pushLocalAggregateIntoScan( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase localAgg, + BatchPhysicalTableSourceScan oldScan, + int[] calcRefFields) { + RowType inputType = FlinkTypeFactory.toLogicalRowType(oldScan.getRowType()); + List<int[]> groupingSets = + Collections.singletonList( + ArrayUtils.addAll(localAgg.grouping(), localAgg.auxGrouping())); + List<AggregateCall> aggCallList = JavaScalaConversionUtil.toJava(localAgg.getAggCallList()); + + // map arg index in aggregate to field index in scan through referred fields by calc. + if (calcRefFields != null) { + groupingSets = translateGroupingArgIndex(groupingSets, calcRefFields); + aggCallList = translateAggCallArgIndex(aggCallList, calcRefFields); + } + + RowType producedType = FlinkTypeFactory.toLogicalRowType(localAgg.getRowType()); + + TableSourceTable oldTableSourceTable = oldScan.tableSourceTable(); + DynamicTableSource newTableSource = oldScan.tableSource().copy(); + + boolean isPushDownSuccess = + AggregatePushDownSpec.apply( + inputType, + groupingSets, + aggCallList, + producedType, + newTableSource, + SourceAbilityContext.from(oldScan)); + + if (!isPushDownSuccess) { + // aggregate push down failed, just return without changing any nodes. + return; + } + + // create new source table with new spec and statistic. + AggregatePushDownSpec aggregatePushDownSpec = + new AggregatePushDownSpec(inputType, groupingSets, aggCallList, producedType); + + Set<String> groupColumns = + Arrays.stream(groupingSets.get(0)) + .boxed() + .map(idx -> inputType.getFieldNames().get(idx)) + .collect(Collectors.toSet()); + FlinkStatistic newFlinkStatistic = getNewFlinkStatistic(oldTableSourceTable, groupColumns); + + TableSourceTable newTableSourceTable = + oldTableSourceTable + .copy( + newTableSource, + localAgg.getRowType(), + new SourceAbilitySpec[] {aggregatePushDownSpec}) + .copy(newFlinkStatistic); + + // transform to new nodes. + BatchPhysicalTableSourceScan newScan = + oldScan.copy(oldScan.getTraitSet(), newTableSourceTable); + BatchPhysicalExchange oldExchange = call.rel(0); + BatchPhysicalExchange newExchange = + oldExchange.copy(oldExchange.getTraitSet(), newScan, oldExchange.getDistribution()); + call.transformTo(newExchange); + } + + private FlinkStatistic getNewFlinkStatistic( + TableSourceTable tableSourceTable, Set<String> groupColumns) { + FlinkStatistic oldStatistic = tableSourceTable.getStatistic(); + + // Create new unique keys if there are group columns + Set<Set<String>> uniqueKeys = null; + if (!groupColumns.isEmpty()) { + uniqueKeys = new HashSet<>(); + uniqueKeys.add(groupColumns); + } + + // Remove tableStats after all aggregates have been pushed down + return FlinkStatistic.builder() + .statistic(oldStatistic) + .uniqueKeys(uniqueKeys) + .tableStats(null) + .build(); + } + + protected boolean checkNoProjectionPushDown(BatchPhysicalTableSourceScan tableSourceScan) { + TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable(); + return tableSourceTable != null + && Arrays.stream(tableSourceTable.abilitySpecs()) + .noneMatch(spec -> spec instanceof ProjectPushDownSpec); + } + + /** + * Currently, we only supports to push down aggregate above calc which has input ref only. + * + * @param calc BatchPhysicalCalc + * @return true if OK to be pushed down + */ + protected boolean checkCalcInputRefOnly(BatchPhysicalCalc calc) { + RexProgram program = calc.getProgram(); + + // check if condition exists. All filters should have been pushed down. + if (program.getCondition() != null) { + return false; + } + + return program.getExprList().stream().allMatch(RexInputRef.class::isInstance) Review comment: most case, we will convert RexLocalRef list to RexNode list via calc.getProgram()::expandLocalRef. program.getProjectList().stream() .map(calc.getProgram()::expandLocalRef) .allMatch(RexInputRef.class::isInstance) ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalSortAggWithSortAndCalcIntoScanRule.java ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalSortAggregate; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSort; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; + +import org.apache.calcite.plan.RelOptRuleCall; + +/** + * Planner rule that tries to push a local sort aggregate which with sort into a {@link Review comment: with sort and calc ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.batch.sql.agg; + +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.runtime.utils.BatchTestBase; +import org.apache.flink.table.planner.runtime.utils.TestData; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.types.Row; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; + +/** Test for local aggregate push down. */ +public class LocalAggregatePushDownITCase extends BatchTestBase { + + @Before + public void before() { + super.before(); + env().setParallelism(1); // set sink parallelism to 1 + + String testDataId = TestValuesTableFactory.registerData(TestData.personData()); + String ddl = + "CREATE TABLE AggregatableTable (\n" + + " id int,\n" + + " age int,\n" + + " name string,\n" + + " height int,\n" + + " gender string,\n" + + " deposit bigint,\n" + + " points bigint,\n" + + " metadata_1 BIGINT METADATA,\n" + + " metadata_2 STRING METADATA\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'data-id' = '" + + testDataId + + "',\n" + + " 'filterable-fields' = 'id;age',\n" + + " 'readable-metadata' = 'metadata_1:BIGINT, metadata_2:STRING',\n" + + " 'bounded' = 'true'\n" + + ")"; + tEnv().executeSql(ddl); + + // partitioned table + String testDataId2 = TestValuesTableFactory.registerData(TestData.personData()); Review comment: this test data is useless ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalSortAggWithCalcIntoScanRule.java ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalSortAggregate; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; + +import org.apache.calcite.plan.RelOptRuleCall; + +/** + * Planner rule that tries to push a local sort aggregate which without sort into a {@link + * BatchPhysicalTableSourceScan} whose table is a {@link TableSourceTable} with a source supporting + * {@link SupportsAggregatePushDown}. The {@link + * OptimizerConfigOptions#TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED} need to be true. + * + * <p>Suppose we have the original physical plan: + * + * <pre>{@code + * BatchPhysicalSortAggregate (global) + * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton) + * +- BatchPhysicalLocalSortAggregate (local) + * +- BatchPhysicalCalc (filed projection only) + * +- BatchPhysicalTableSourceScan + * }</pre> + * + * <p>This physical plan will be rewritten to: + * + * <pre>{@code + * BatchPhysicalSortAggregate (global) + * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton) + * +- BatchPhysicalTableSourceScan (with local aggregate pushed down) + * }</pre> + */ +public class PushLocalSortAggWithCalcIntoScanRule extends PushLocalAggIntoScanRuleBase { + public static final PushLocalSortAggWithCalcIntoScanRule INSTANCE = + new PushLocalSortAggWithCalcIntoScanRule(); + + public PushLocalSortAggWithCalcIntoScanRule() { + super( + operand( + BatchPhysicalExchange.class, + operand( + BatchPhysicalLocalSortAggregate.class, + operand( + BatchPhysicalCalc.class, + operand(BatchPhysicalTableSourceScan.class, none())))), + "PushLocalSortAggWithCalcIntoScanRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + BatchPhysicalLocalSortAggregate localAggregate = call.rel(1); + BatchPhysicalCalc calc = call.rel(2); + BatchPhysicalTableSourceScan tableSourceScan = call.rel(3); + + return checkCalcInputRefOnly(calc) + && checkNoProjectionPushDown(tableSourceScan) + && checkMatchesAggregatePushDown(call, localAggregate, tableSourceScan); + } + + @Override + public void onMatch(RelOptRuleCall call) { + BatchPhysicalLocalSortAggregate localHashAgg = call.rel(1); + BatchPhysicalCalc calc = call.rel(2); + BatchPhysicalTableSourceScan oldScan = call.rel(3); + + int[] calcRefFields = + RexNodeExtractor.extractRefInputFields(calc.getProgram().getExprList()); Review comment: program.getProjectList().stream() .map(calc.getProgram()::expandLocalRef) ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.batch.sql.agg; + +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.runtime.utils.BatchTestBase; +import org.apache.flink.table.planner.runtime.utils.TestData; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.types.Row; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; + +/** Test for local aggregate push down. */ +public class LocalAggregatePushDownITCase extends BatchTestBase { Review comment: please add a test case about push local aggregate with auxGrouping ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoScanRuleBase.java ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexSlot; +import org.apache.commons.lang3.ArrayUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan} + * whose table is a {@link TableSourceTable} with a source supporting {@link + * SupportsAggregatePushDown}. + * + * <p>The aggregate push down does not support a number of more complex statements at present: + * + * <ul> + * <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS. + * <li>expressions inside the aggregation function call: such as sum(a * b). + * <li>aggregations with ordering. + * <li>aggregations with filter. + * </ul> + */ +public abstract class PushLocalAggIntoScanRuleBase extends RelOptRule { + + public PushLocalAggIntoScanRuleBase(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + protected boolean checkMatchesAggregatePushDown( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase aggregate, + BatchPhysicalTableSourceScan tableSourceScan) { + TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig(); + if (!tableConfig + .getConfiguration() + .getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) { + return false; + } + + if (aggregate.isFinal() || aggregate.getAggCallList().isEmpty()) { + return false; + } + List<AggregateCall> aggCallList = + JavaScalaConversionUtil.toJava(aggregate.getAggCallList()); + for (AggregateCall aggCall : aggCallList) { + if (aggCall.isDistinct() + || aggCall.isApproximate() + || aggCall.getArgList().size() > 1 + || aggCall.hasFilter() + || !aggCall.getCollation().getFieldCollations().isEmpty()) { + return false; + } + } + TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable(); + // we can not push aggregates twice + return tableSourceTable != null + && tableSourceTable.tableSource() instanceof SupportsAggregatePushDown + && Arrays.stream(tableSourceTable.abilitySpecs()) + .noneMatch(spec -> spec instanceof AggregatePushDownSpec); + } + + protected void pushLocalAggregateIntoScan( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase localAgg, + BatchPhysicalTableSourceScan oldScan) { + pushLocalAggregateIntoScan(call, localAgg, oldScan, null); + } + + protected void pushLocalAggregateIntoScan( + RelOptRuleCall call, + BatchPhysicalGroupAggregateBase localAgg, + BatchPhysicalTableSourceScan oldScan, + int[] calcRefFields) { + RowType inputType = FlinkTypeFactory.toLogicalRowType(oldScan.getRowType()); + List<int[]> groupingSets = + Collections.singletonList( + ArrayUtils.addAll(localAgg.grouping(), localAgg.auxGrouping())); + List<AggregateCall> aggCallList = JavaScalaConversionUtil.toJava(localAgg.getAggCallList()); + + // map arg index in aggregate to field index in scan through referred fields by calc. + if (calcRefFields != null) { + groupingSets = translateGroupingArgIndex(groupingSets, calcRefFields); + aggCallList = translateAggCallArgIndex(aggCallList, calcRefFields); + } + + RowType producedType = FlinkTypeFactory.toLogicalRowType(localAgg.getRowType()); + + TableSourceTable oldTableSourceTable = oldScan.tableSourceTable(); + DynamicTableSource newTableSource = oldScan.tableSource().copy(); + + boolean isPushDownSuccess = + AggregatePushDownSpec.apply( + inputType, + groupingSets, + aggCallList, + producedType, + newTableSource, + SourceAbilityContext.from(oldScan)); + + if (!isPushDownSuccess) { + // aggregate push down failed, just return without changing any nodes. + return; + } + + // create new source table with new spec and statistic. + AggregatePushDownSpec aggregatePushDownSpec = + new AggregatePushDownSpec(inputType, groupingSets, aggCallList, producedType); + + Set<String> groupColumns = + Arrays.stream(groupingSets.get(0)) + .boxed() + .map(idx -> inputType.getFieldNames().get(idx)) + .collect(Collectors.toSet()); + FlinkStatistic newFlinkStatistic = getNewFlinkStatistic(oldTableSourceTable, groupColumns); + + TableSourceTable newTableSourceTable = + oldTableSourceTable + .copy( + newTableSource, + localAgg.getRowType(), + new SourceAbilitySpec[] {aggregatePushDownSpec}) + .copy(newFlinkStatistic); + + // transform to new nodes. + BatchPhysicalTableSourceScan newScan = + oldScan.copy(oldScan.getTraitSet(), newTableSourceTable); + BatchPhysicalExchange oldExchange = call.rel(0); + BatchPhysicalExchange newExchange = + oldExchange.copy(oldExchange.getTraitSet(), newScan, oldExchange.getDistribution()); + call.transformTo(newExchange); + } + + private FlinkStatistic getNewFlinkStatistic( + TableSourceTable tableSourceTable, Set<String> groupColumns) { + FlinkStatistic oldStatistic = tableSourceTable.getStatistic(); + + // Create new unique keys if there are group columns + Set<Set<String>> uniqueKeys = null; + if (!groupColumns.isEmpty()) { + uniqueKeys = new HashSet<>(); + uniqueKeys.add(groupColumns); + } + + // Remove tableStats after all aggregates have been pushed down + return FlinkStatistic.builder() + .statistic(oldStatistic) + .uniqueKeys(uniqueKeys) + .tableStats(null) + .build(); + } + + protected boolean checkNoProjectionPushDown(BatchPhysicalTableSourceScan tableSourceScan) { + TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable(); + return tableSourceTable != null + && Arrays.stream(tableSourceTable.abilitySpecs()) + .noneMatch(spec -> spec instanceof ProjectPushDownSpec); + } + + /** + * Currently, we only supports to push down aggregate above calc which has input ref only. + * + * @param calc BatchPhysicalCalc + * @return true if OK to be pushed down + */ + protected boolean checkCalcInputRefOnly(BatchPhysicalCalc calc) { + RexProgram program = calc.getProgram(); + + // check if condition exists. All filters should have been pushed down. + if (program.getCondition() != null) { + return false; + } + + return program.getExprList().stream().allMatch(RexInputRef.class::isInstance) + && !program.getProjectList().isEmpty(); + } + + protected int[] getRefFiledIndexFromCalc(BatchPhysicalCalc calc) { Review comment: use RexNodeExtractor.extractRefInputField instead ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.java ########## @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.functions.aggfunctions.CollectAggFunction; +import org.apache.flink.table.planner.utils.BatchTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; + +/** + * Test for rules that extend {@link PushLocalAggIntoScanRuleBase} to push down local aggregates + * into table source. + */ +public class PushLocalAggIntoTableSourceScanRuleTest extends TableTestBase { + protected BatchTableTestUtil util = batchTestUtil(new TableConfig()); + + @Before + public void setup() { + TableConfig tableConfig = util.tableEnv().getConfig(); + tableConfig + .getConfiguration() + .setBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED, + true); + String ddl = + "CREATE TABLE inventory (\n" + + " id BIGINT,\n" + + " name STRING,\n" + + " amount BIGINT,\n" + + " price BIGINT,\n" + + " type STRING\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'filterable-fields' = 'id;type',\n" + + " 'bounded' = 'true'\n" + + ")"; + util.tableEnv().executeSql(ddl); + + String ddl2 = + "CREATE TABLE inventory_meta (\n" + + " id BIGINT,\n" + + " name STRING,\n" + + " amount BIGINT,\n" + + " price BIGINT,\n" + + " type STRING,\n" + + " metadata_1 BIGINT METADATA,\n" + + " metadata_2 STRING METADATA,\n" + + " PRIMARY KEY (`id`) NOT ENFORCED\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'filterable-fields' = 'id;type',\n" + + " 'readable-metadata' = 'metadata_1:BIGINT, metadata_2:STRING',\n" + + " 'bounded' = 'true'\n" + + ")"; + util.tableEnv().executeSql(ddl2); + + // partitioned table + String ddl3 = + "CREATE TABLE inventory_part (\n" + + " id BIGINT,\n" + + " name STRING,\n" + + " amount BIGINT,\n" + + " price BIGINT,\n" + + " type STRING\n" + + ") PARTITIONED BY (type)\n" + + "WITH (\n" + + " 'connector' = 'values',\n" + + " 'filterable-fields' = 'id;type',\n" + + " 'partition-list' = 'type:a;type:b',\n" + + " 'bounded' = 'true'\n" + + ")"; + util.tableEnv().executeSql(ddl3); + + // disable projection push down + String ddl4 = + "CREATE TABLE inventory_no_proj (\n" + + " id BIGINT,\n" + + " name STRING,\n" + + " amount BIGINT,\n" + + " price BIGINT,\n" + + " type STRING\n" + + ")\n" + + "WITH (\n" + + " 'connector' = 'values',\n" + + " 'filterable-fields' = 'id;type',\n" + + " 'disable-projection-push-down' = 'true',\n" + + " 'bounded' = 'true'\n" + + ")"; + util.tableEnv().executeSql(ddl4); + } + + @Test + public void testCanPushDownLocalHashAggWithGroup() { + util.verifyRelPlan( + "SELECT\n" + + " sum(amount),\n" + + " name,\n" + + " type\n" + + "FROM inventory\n" + + " group by name, type"); + } + + @Test + public void testDisablePushDownLocalAgg() { + // disable push down local agg + util.getTableEnv() + .getConfig() + .getConfiguration() + .setBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED, + false); + + util.verifyRelPlan( + "SELECT\n" + + " sum(amount),\n" + + " name,\n" + + " type\n" + + "FROM inventory\n" + + " group by name, type"); + + // reset config + util.getTableEnv() + .getConfig() + .getConfiguration() + .setBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED, + true); + } + + @Test + public void testCanPushDownLocalHashAggWithoutGroup() { + util.verifyRelPlan( + "SELECT\n" + + " min(id),\n" + + " max(amount),\n" + + " sum(price),\n" + + " avg(price),\n" + + " count(id)\n" + + "FROM inventory"); + } + + @Test + public void testCanPushDownLocalSortAggWithoutSort() { + // enable sort agg + util.getTableEnv() + .getConfig() + .getConfiguration() + .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg"); + + util.verifyRelPlan( + "SELECT\n" + + " min(id),\n" + + " max(amount),\n" + + " sum(price),\n" + + " avg(price),\n" + + " count(id)\n" + + "FROM inventory"); + + // reset config + util.getTableEnv() + .getConfig() + .getConfiguration() + .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, ""); + } + + @Test + public void testCanPushDownLocalSortAggWithSort() { + // enable sort agg + util.getTableEnv() + .getConfig() + .getConfiguration() + .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashAgg"); + + util.verifyRelPlan( + "SELECT\n" + + " sum(amount),\n" + + " name,\n" + + " type\n" + + "FROM inventory\n" + + " group by name, type"); + + // reset config + util.getTableEnv() + .getConfig() + .getConfiguration() + .setString(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, ""); + } + + @Test + public void testCanPushDownLocalAggAfterFilterPushDown() { + + util.verifyRelPlan( + "SELECT\n" + + " sum(amount),\n" + + " name,\n" + + " type\n" + + "FROM inventory\n" + + " where id = 123\n" + + " group by name, type"); + } + + @Test + public void testCanPushDownLocalAggWithMetadata() { + util.verifyRelPlan( + "SELECT\n" + + " sum(amount),\n" + + " max(metadata_1),\n" + + " name,\n" + + " type\n" + + "FROM inventory_meta\n" + + " where id = 123\n" + + " group by name, type"); + } + + @Test + public void testCanPushDownLocalAggWithPartition() { + util.verifyRelPlan( + "SELECT\n" + + " sum(amount),\n" + + " type,\n" + + " name\n" + + "FROM inventory_part\n" + + " where type in ('a', 'b') and id = 123\n" + + " group by type, name"); + } + + @Test + public void testCanPushDownLocalAggWithoutProjectionPushDown() { + util.verifyRelPlan( + "SELECT\n" + + " sum(amount),\n" + + " name,\n" + + " type\n" + + "FROM inventory_no_proj\n" + + " where id = 123\n" + + " group by name, type"); + } + + @Test + public void testCannotPushDownLocalAggWithAuxGrouping() { + util.verifyRelPlan( + "SELECT\n" + + " id, name, count(*)\n" + + "FROM inventory_meta\n" + + " group by id, name, abs(amount)"); Review comment: remove abs(amount), otherwise the local aggregate with auxGrouping can not be pushed down. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
