Airblader commented on a change in pull request #17344:
URL: https://github.com/apache/flink/pull/17344#discussion_r715005516



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
##########
@@ -93,6 +93,16 @@
                                     + 
TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED.key()
                                     + " is true.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+    public static final ConfigOption<Boolean> 
TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED =
+            key("table.optimizer.source.aggregate-pushdown-enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "When it is true, the optimizer will push down the 
local aggregates into "
+                                    + "the TableSource which implements 
SupportsAggregatePushDown. "
+                                    + "Default value is false.");

Review comment:
       The default doesn't need to be stated here. It is recorded above and the 
docs generator will use that, so this leads to duplication.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
##########
@@ -93,6 +93,16 @@
                                     + 
TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED.key()
                                     + " is true.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+    public static final ConfigOption<Boolean> 
TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED =
+            key("table.optimizer.source.aggregate-pushdown-enabled")
+                    .booleanType()
+                    .defaultValue(false)

Review comment:
       Is there a reason to disable this by default? I would expect that a 
source implements the interface only if it does so properly. By disabling this 
we hide this optimization from the majority of users.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoScanRuleBase.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import 
org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.core.AggregateCall;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner rule that tries to push a local aggregator into an {@link 
BatchPhysicalTableSourceScan}
+ * which table is a {@link TableSourceTable}. And the table source in the 
table is a {@link

Review comment:
       ```suggestion
    * whose table is a {@link TableSourceTable} with a source supporting {@link
   ```
   
   Same applies to subclasses

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoScanRuleBase.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import 
org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.core.AggregateCall;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner rule that tries to push a local aggregator into an {@link 
BatchPhysicalTableSourceScan}
+ * which table is a {@link TableSourceTable}. And the table source in the 
table is a {@link
+ * SupportsAggregatePushDown}.
+ *
+ * <p>The aggregate push down does not support a number of more complex 
statements at present:
+ *
+ * <ul>
+ *   <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS.
+ *   <li>expressions inside the aggregation function call: such as sum(a * b).
+ *   <li>aggregations with ordering.
+ *   <li>aggregations with filter.
+ * </ul>
+ */
+public abstract class PushLocalAggIntoScanRuleBase extends RelOptRule {
+
+    public PushLocalAggIntoScanRuleBase(RelOptRuleOperand operand, String 
description) {
+        super(operand, description);
+    }
+
+    protected boolean isMatch(
+            RelOptRuleCall call,
+            BatchPhysicalGroupAggregateBase aggregate,
+            BatchPhysicalTableSourceScan tableSourceScan) {
+        TableConfig tableConfig = 
ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig();
+        if (!tableConfig
+                .getConfiguration()
+                .getBoolean(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) {
+            return false;
+        }
+
+        if (aggregate.isFinal() || aggregate.getAggCallList().isEmpty()) {
+            return false;
+        }
+        List<AggregateCall> aggCallList =
+                JavaScalaConversionUtil.toJava(aggregate.getAggCallList());
+        for (AggregateCall aggCall : aggCallList) {
+            if (aggCall.isDistinct()
+                    || aggCall.isApproximate()
+                    || aggCall.getArgList().size() > 1
+                    || aggCall.hasFilter()
+                    || !aggCall.getCollation().getFieldCollations().isEmpty()) 
{
+                return false;
+            }
+        }
+        TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable();
+        // we can not push aggregates twice
+        return tableSourceTable != null
+                && tableSourceTable.tableSource() instanceof 
SupportsAggregatePushDown
+                && Arrays.stream(tableSourceTable.abilitySpecs())
+                        .noneMatch(spec -> spec instanceof 
AggregatePushDownSpec);
+    }
+
+    protected void pushLocalAggregateIntoScan(
+            RelOptRuleCall call,
+            BatchPhysicalGroupAggregateBase localAgg,
+            BatchPhysicalTableSourceScan oldScan) {
+        RowType inputType = 
FlinkTypeFactory.toLogicalRowType(oldScan.getRowType());
+        List<int[]> groupingSets = Arrays.asList(localAgg.grouping(), 
localAgg.auxGrouping());
+        List<AggregateCall> aggCallList = 
JavaScalaConversionUtil.toJava(localAgg.getAggCallList());
+        RowType producedType = 
FlinkTypeFactory.toLogicalRowType(localAgg.getRowType());
+
+        TableSourceTable oldTableSourceTable = oldScan.tableSourceTable();
+        DynamicTableSource newTableSource = oldScan.tableSource().copy();
+
+        boolean isPushDownSuccess =
+                AggregatePushDownSpec.apply(
+                        inputType, groupingSets, aggCallList, producedType, 
newTableSource);
+
+        if (!isPushDownSuccess) {
+            // aggregate push down failed, just return without changing any 
nodes.
+            return;
+        }
+
+        FlinkStatistic newFlinkStatistic = 
getNewFlinkStatistic(oldTableSourceTable);
+        AggregatePushDownSpec aggregatePushDownSpec =
+                new AggregatePushDownSpec(inputType, groupingSets, 
aggCallList, producedType);
+
+        TableSourceTable newTableSourceTable =
+                oldTableSourceTable
+                        .copy(
+                                newTableSource,
+                                newFlinkStatistic,
+                                new SourceAbilitySpec[] 
{aggregatePushDownSpec})
+                        .copy(localAgg.getRowType());
+        BatchPhysicalTableSourceScan newScan =
+                oldScan.copy(oldScan.getTraitSet(), newTableSourceTable);
+        BatchPhysicalExchange oldExchange = call.rel(0);
+        BatchPhysicalExchange newExchange =
+                oldExchange.copy(oldExchange.getTraitSet(), newScan, 
oldExchange.getDistribution());
+        call.transformTo(newExchange);
+    }
+
+    private FlinkStatistic getNewFlinkStatistic(TableSourceTable 
tableSourceTable) {
+        FlinkStatistic oldStatistic = tableSourceTable.getStatistic();
+        FlinkStatistic newStatistic;

Review comment:
       nit: this variable is unnecessary, you can return directly within 
if/else.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/AggregatePushDownSpec.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.abilities.source;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.expressions.AggregateExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction;
+import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction;
+import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction;
+import org.apache.flink.table.planner.plan.utils.AggregateInfo;
+import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A sub-class of {@link SourceAbilitySpec} that can not only 
serialize/deserialize the aggregation
+ * to/from JSON, but also can push the local aggregate into a {@link 
SupportsAggregatePushDown}.
+ */
+@JsonTypeName("AggregatePushDown")
+public class AggregatePushDownSpec extends SourceAbilitySpecBase {
+
+    public static final String FIELD_NAME_INPUT_TYPE = "inputType";
+
+    public static final String FIELD_NAME_GROUPING_SETS = "groupingSets";
+
+    public static final String FIELD_NAME_AGGREGATE_CALLS = "aggregateCalls";
+
+    @JsonProperty(FIELD_NAME_INPUT_TYPE)
+    private final RowType inputType;
+
+    @JsonProperty(FIELD_NAME_GROUPING_SETS)
+    private final List<int[]> groupingSets;
+
+    @JsonProperty(FIELD_NAME_AGGREGATE_CALLS)
+    private final List<AggregateCall> aggregateCalls;
+
+    @JsonCreator
+    public AggregatePushDownSpec(
+            @JsonProperty(FIELD_NAME_INPUT_TYPE) RowType inputType,
+            @JsonProperty(FIELD_NAME_GROUPING_SETS) List<int[]> groupingSets,
+            @JsonProperty(FIELD_NAME_AGGREGATE_CALLS) List<AggregateCall> 
aggregateCalls,
+            @JsonProperty(FIELD_NAME_PRODUCED_TYPE) RowType producedType) {
+        super(producedType);
+
+        this.inputType = inputType;
+        this.groupingSets = new ArrayList<>(checkNotNull(groupingSets));
+        this.aggregateCalls = aggregateCalls;
+    }
+
+    @Override
+    public void apply(DynamicTableSource tableSource, SourceAbilityContext 
context) {
+        checkArgument(getProducedType().isPresent());
+        apply(inputType, groupingSets, aggregateCalls, 
getProducedType().get(), tableSource);
+    }
+
+    @Override
+    public String getDigests(SourceAbilityContext context) {
+        String extraDigest;
+        String groupingStr = "";
+        int[] grouping = ArrayUtils.addAll(groupingSets.get(0), 
groupingSets.get(1));
+        if (grouping.length > 0) {

Review comment:
       Here and below, the `if` are unnecessary. Iterating over empty lists 
works just fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to