Airblader commented on a change in pull request #17344:
URL: https://github.com/apache/flink/pull/17344#discussion_r715005516
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
##########
@@ -93,6 +93,16 @@
+
TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED.key()
+ " is true.");
+ @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+ public static final ConfigOption<Boolean>
TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED =
+ key("table.optimizer.source.aggregate-pushdown-enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "When it is true, the optimizer will push down the
local aggregates into "
+ + "the TableSource which implements
SupportsAggregatePushDown. "
+ + "Default value is false.");
Review comment:
The default doesn't need to be stated here. It is recorded above and the
docs generator will use that, so this leads to duplication.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
##########
@@ -93,6 +93,16 @@
+
TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED.key()
+ " is true.");
+ @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+ public static final ConfigOption<Boolean>
TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED =
+ key("table.optimizer.source.aggregate-pushdown-enabled")
+ .booleanType()
+ .defaultValue(false)
Review comment:
Is there a reason to disable this by default? I would expect that a
source implements the interface only if it does so properly. By disabling this
we hide this optimization from the majority of users.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoScanRuleBase.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import
org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.core.AggregateCall;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner rule that tries to push a local aggregator into an {@link
BatchPhysicalTableSourceScan}
+ * which table is a {@link TableSourceTable}. And the table source in the
table is a {@link
Review comment:
```suggestion
* whose table is a {@link TableSourceTable} with a source supporting {@link
```
Same applies to subclasses
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoScanRuleBase.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import
org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.core.AggregateCall;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner rule that tries to push a local aggregator into an {@link
BatchPhysicalTableSourceScan}
+ * which table is a {@link TableSourceTable}. And the table source in the
table is a {@link
+ * SupportsAggregatePushDown}.
+ *
+ * <p>The aggregate push down does not support a number of more complex
statements at present:
+ *
+ * <ul>
+ * <li>complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS.
+ * <li>expressions inside the aggregation function call: such as sum(a * b).
+ * <li>aggregations with ordering.
+ * <li>aggregations with filter.
+ * </ul>
+ */
+public abstract class PushLocalAggIntoScanRuleBase extends RelOptRule {
+
+ public PushLocalAggIntoScanRuleBase(RelOptRuleOperand operand, String
description) {
+ super(operand, description);
+ }
+
+ protected boolean isMatch(
+ RelOptRuleCall call,
+ BatchPhysicalGroupAggregateBase aggregate,
+ BatchPhysicalTableSourceScan tableSourceScan) {
+ TableConfig tableConfig =
ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig();
+ if (!tableConfig
+ .getConfiguration()
+ .getBoolean(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) {
+ return false;
+ }
+
+ if (aggregate.isFinal() || aggregate.getAggCallList().isEmpty()) {
+ return false;
+ }
+ List<AggregateCall> aggCallList =
+ JavaScalaConversionUtil.toJava(aggregate.getAggCallList());
+ for (AggregateCall aggCall : aggCallList) {
+ if (aggCall.isDistinct()
+ || aggCall.isApproximate()
+ || aggCall.getArgList().size() > 1
+ || aggCall.hasFilter()
+ || !aggCall.getCollation().getFieldCollations().isEmpty())
{
+ return false;
+ }
+ }
+ TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable();
+ // we can not push aggregates twice
+ return tableSourceTable != null
+ && tableSourceTable.tableSource() instanceof
SupportsAggregatePushDown
+ && Arrays.stream(tableSourceTable.abilitySpecs())
+ .noneMatch(spec -> spec instanceof
AggregatePushDownSpec);
+ }
+
+ protected void pushLocalAggregateIntoScan(
+ RelOptRuleCall call,
+ BatchPhysicalGroupAggregateBase localAgg,
+ BatchPhysicalTableSourceScan oldScan) {
+ RowType inputType =
FlinkTypeFactory.toLogicalRowType(oldScan.getRowType());
+ List<int[]> groupingSets = Arrays.asList(localAgg.grouping(),
localAgg.auxGrouping());
+ List<AggregateCall> aggCallList =
JavaScalaConversionUtil.toJava(localAgg.getAggCallList());
+ RowType producedType =
FlinkTypeFactory.toLogicalRowType(localAgg.getRowType());
+
+ TableSourceTable oldTableSourceTable = oldScan.tableSourceTable();
+ DynamicTableSource newTableSource = oldScan.tableSource().copy();
+
+ boolean isPushDownSuccess =
+ AggregatePushDownSpec.apply(
+ inputType, groupingSets, aggCallList, producedType,
newTableSource);
+
+ if (!isPushDownSuccess) {
+ // aggregate push down failed, just return without changing any
nodes.
+ return;
+ }
+
+ FlinkStatistic newFlinkStatistic =
getNewFlinkStatistic(oldTableSourceTable);
+ AggregatePushDownSpec aggregatePushDownSpec =
+ new AggregatePushDownSpec(inputType, groupingSets,
aggCallList, producedType);
+
+ TableSourceTable newTableSourceTable =
+ oldTableSourceTable
+ .copy(
+ newTableSource,
+ newFlinkStatistic,
+ new SourceAbilitySpec[]
{aggregatePushDownSpec})
+ .copy(localAgg.getRowType());
+ BatchPhysicalTableSourceScan newScan =
+ oldScan.copy(oldScan.getTraitSet(), newTableSourceTable);
+ BatchPhysicalExchange oldExchange = call.rel(0);
+ BatchPhysicalExchange newExchange =
+ oldExchange.copy(oldExchange.getTraitSet(), newScan,
oldExchange.getDistribution());
+ call.transformTo(newExchange);
+ }
+
+ private FlinkStatistic getNewFlinkStatistic(TableSourceTable
tableSourceTable) {
+ FlinkStatistic oldStatistic = tableSourceTable.getStatistic();
+ FlinkStatistic newStatistic;
Review comment:
nit: this variable is unnecessary, you can return directly within
if/else.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/AggregatePushDownSpec.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.abilities.source;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.expressions.AggregateExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction;
+import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction;
+import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction;
+import org.apache.flink.table.planner.plan.utils.AggregateInfo;
+import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A sub-class of {@link SourceAbilitySpec} that can not only
serialize/deserialize the aggregation
+ * to/from JSON, but also can push the local aggregate into a {@link
SupportsAggregatePushDown}.
+ */
+@JsonTypeName("AggregatePushDown")
+public class AggregatePushDownSpec extends SourceAbilitySpecBase {
+
+ public static final String FIELD_NAME_INPUT_TYPE = "inputType";
+
+ public static final String FIELD_NAME_GROUPING_SETS = "groupingSets";
+
+ public static final String FIELD_NAME_AGGREGATE_CALLS = "aggregateCalls";
+
+ @JsonProperty(FIELD_NAME_INPUT_TYPE)
+ private final RowType inputType;
+
+ @JsonProperty(FIELD_NAME_GROUPING_SETS)
+ private final List<int[]> groupingSets;
+
+ @JsonProperty(FIELD_NAME_AGGREGATE_CALLS)
+ private final List<AggregateCall> aggregateCalls;
+
+ @JsonCreator
+ public AggregatePushDownSpec(
+ @JsonProperty(FIELD_NAME_INPUT_TYPE) RowType inputType,
+ @JsonProperty(FIELD_NAME_GROUPING_SETS) List<int[]> groupingSets,
+ @JsonProperty(FIELD_NAME_AGGREGATE_CALLS) List<AggregateCall>
aggregateCalls,
+ @JsonProperty(FIELD_NAME_PRODUCED_TYPE) RowType producedType) {
+ super(producedType);
+
+ this.inputType = inputType;
+ this.groupingSets = new ArrayList<>(checkNotNull(groupingSets));
+ this.aggregateCalls = aggregateCalls;
+ }
+
+ @Override
+ public void apply(DynamicTableSource tableSource, SourceAbilityContext
context) {
+ checkArgument(getProducedType().isPresent());
+ apply(inputType, groupingSets, aggregateCalls,
getProducedType().get(), tableSource);
+ }
+
+ @Override
+ public String getDigests(SourceAbilityContext context) {
+ String extraDigest;
+ String groupingStr = "";
+ int[] grouping = ArrayUtils.addAll(groupingSets.get(0),
groupingSets.get(1));
+ if (grouping.length > 0) {
Review comment:
Here and below, the `if` are unnecessary. Iterating over empty lists
works just fine.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]