This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6f5425f [FLINK-12801][table-planner-blink] Set parallelism for batch
SQL
6f5425f is described below
commit 6f5425fc9798510fef33ccd7bb81d4b9f59bffa5
Author: 姬平 <[email protected]>
AuthorDate: Tue Jun 11 18:22:32 2019 +0800
[FLINK-12801][table-planner-blink] Set parallelism for batch SQL
This closes #8690
---
.../flink/table/api/PlannerConfigOptions.java | 6 +
.../table/plan/nodes/exec/NodeResourceConfig.java | 81 -----
.../plan/nodes/process/DAGProcessContext.java | 42 +++
.../table/plan/nodes/process/DAGProcessor.java | 34 +++
.../table/plan/nodes/resource/NodeResource.java | 44 +++
.../parallelism/BatchFinalParallelismSetter.java | 112 +++++++
.../parallelism/BatchParallelismProcessor.java | 68 +++++
.../BatchShuffleStageParallelismCalculator.java | 93 ++++++
.../batch/parallelism/NodeResourceConfig.java | 108 +++++++
.../resource/batch/parallelism/ShuffleStage.java | 78 +++++
.../batch/parallelism/ShuffleStageGenerator.java | 148 ++++++++++
.../flink/table/api/BatchTableEnvironment.scala | 10 +-
.../flink/table/api/StreamTableEnvironment.scala | 2 +-
.../apache/flink/table/api/TableEnvironment.scala | 6 +-
.../table/plan/metadata/FlinkRelMdRowCount.scala | 5 +-
.../flink/table/plan/nodes/exec/ExecNode.scala | 12 +-
.../batch/BatchExecBoundedStreamScan.scala | 8 +-
.../plan/nodes/physical/batch/BatchExecCalc.scala | 2 +-
.../nodes/physical/batch/BatchExecCorrelate.scala | 2 +-
.../nodes/physical/batch/BatchExecExpand.scala | 2 +-
.../physical/batch/BatchExecHashAggregate.scala | 8 -
.../batch/BatchExecHashAggregateBase.scala | 11 +-
.../batch/BatchExecHashWindowAggregateBase.scala | 7 +-
.../plan/nodes/physical/batch/BatchExecLimit.scala | 2 +-
.../batch/BatchExecLocalHashAggregate.scala | 3 -
.../batch/BatchExecLocalSortAggregate.scala | 3 -
.../batch/BatchExecLocalSortWindowAggregate.scala | 6 -
.../nodes/physical/batch/BatchExecLookupJoin.scala | 4 +-
.../physical/batch/BatchExecOverAggregate.scala | 5 +-
.../plan/nodes/physical/batch/BatchExecRank.scala | 2 +-
.../plan/nodes/physical/batch/BatchExecSink.scala | 18 +-
.../plan/nodes/physical/batch/BatchExecSort.scala | 9 +-
.../physical/batch/BatchExecSortAggregate.scala | 4 -
.../batch/BatchExecSortAggregateBase.scala | 8 +-
.../nodes/physical/batch/BatchExecSortLimit.scala | 2 +-
.../physical/batch/BatchExecSortMergeJoin.scala | 7 +-
.../batch/BatchExecSortWindowAggregate.scala | 4 -
.../batch/BatchExecSortWindowAggregateBase.scala | 4 +-
.../physical/batch/BatchExecTableSourceScan.scala | 24 +-
.../nodes/physical/batch/BatchExecValues.scala | 5 +-
.../plan/reuse/DeadlockBreakupProcessor.scala | 6 +-
.../flink/table/plan/util/ExecNodePlanDumper.scala | 28 +-
.../plan/nodes/resource/MockNodeTestBase.java | 92 ++++++
.../BatchFinalParallelismSetterTest.java | 125 ++++++++
...BatchShuffleStageParallelismCalculatorTest.java | 100 +++++++
.../parallelism/ShuffleStageGeneratorTest.java | 326 +++++++++++++++++++++
.../plan/nodes/resource/BatchExecResourceTest.xml | 185 ++++++++++++
.../nodes/resource/BatchExecResourceTest.scala | 124 ++++++++
.../table/runtime/batch/sql/TableScanITCase.scala | 4 +-
.../apache/flink/table/util/TableTestBase.scala | 24 +-
.../apache/flink/table/util/testTableSources.scala | 4 +-
.../apache/flink/table/api/TableConfigOptions.java | 33 ++-
52 files changed, 1864 insertions(+), 186 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
index c2bdf85..9441ede 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
@@ -126,4 +126,10 @@ public class PlannerConfigOptions {
.defaultValue(false)
.withDescription("Disable union all as
breakpoint when constructing RelNodeBlock");
+ public static final ConfigOption<Long> SQL_OPTIMIZER_ROWS_PER_LOCALAGG =
+ key("sql.optimizer.rows-per-localAgg")
+ .defaultValue(1000000L)
+ .withDescription("Sets how many rows
one localAgg processes. We will infer agg degree to decide whether " +
+ "to use localAgg
according to it.");
+
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/exec/NodeResourceConfig.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/exec/NodeResourceConfig.java
deleted file mode 100644
index f5f2107..0000000
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/exec/NodeResourceConfig.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.nodes.exec;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.TableConfigOptions;
-
-import static org.apache.flink.configuration.ConfigOptions.key;
-
-/**
- * Deal with resource config for {@link
org.apache.flink.table.plan.nodes.exec.ExecNode}.
- */
-public class NodeResourceConfig {
-
- public static final ConfigOption<Integer>
SQL_RESOURCE_INFER_OPERATOR_PARALLELISM_MIN =
- key("sql.resource.infer.operator.parallelism.min")
- .defaultValue(1)
- .withDescription("Sets min parallelism
for operators.");
-
- /**
- * Gets the config max num of operator parallelism.
- *
- * @param tableConf Configuration.
- * @return the config max num of operator parallelism.
- */
- public static int getOperatorMaxParallelism(Configuration tableConf) {
- return
tableConf.getInteger(TableConfigOptions.SQL_RESOURCE_INFER_OPERATOR_PARALLELISM_MAX);
- }
-
- /**
- * Gets the config min num of operator parallelism.
- *
- * @param tableConf Configuration.
- * @return the config max num of operator parallelism.
- */
- public static int getOperatorMinParallelism(Configuration tableConf) {
- return
tableConf.getInteger(SQL_RESOURCE_INFER_OPERATOR_PARALLELISM_MIN);
- }
-
- /**
- * Gets the config row count that one partition processes.
- *
- * @param tableConf Configuration.
- * @return the config row count that one partition processes.
- */
- public static long getRowCountPerPartition(Configuration tableConf) {
- return
tableConf.getLong(TableConfigOptions.SQL_RESOURCE_INFER_ROWS_PER_PARTITION);
- }
-
- /**
- * Calculates operator parallelism based on rowcount of the operator.
- *
- * @param rowCount rowCount of the operator
- * @param tableConf Configuration.
- * @return the result of operator parallelism.
- */
- public static int calOperatorParallelism(double rowCount, Configuration
tableConf) {
- int maxParallelism = getOperatorMaxParallelism(tableConf);
- int minParallelism = getOperatorMinParallelism(tableConf);
- int resultParallelism = (int) (rowCount /
getRowCountPerPartition(tableConf));
- return Math.max(Math.min(resultParallelism, maxParallelism),
minParallelism);
- }
-
-}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/process/DAGProcessContext.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/process/DAGProcessContext.java
new file mode 100644
index 0000000..3d825e0
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/process/DAGProcessContext.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.process;
+
+import org.apache.flink.table.api.TableEnvironment;
+
+/**
+ * Context for processors to process dag.
+ */
+public class DAGProcessContext {
+
+ private final TableEnvironment tableEnvironment;
+
+ public DAGProcessContext(TableEnvironment tableEnvironment) {
+ this.tableEnvironment = tableEnvironment;
+ }
+
+ /**
+ * Gets {@link TableEnvironment}, {@link
org.apache.flink.table.api.BatchTableEnvironment} for batch job.
+ * and {@link org.apache.flink.table.api.StreamTableEnvironment} for
stream job.
+ */
+ public TableEnvironment getTableEnvironment() {
+ return tableEnvironment;
+ }
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/process/DAGProcessor.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/process/DAGProcessor.java
new file mode 100644
index 0000000..fd7af93
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/process/DAGProcessor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.process;
+
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+
+import java.util.List;
+
+/**
+ * DAGProcess plugin, use it can set resource of dag or change other node info.
+ */
+public interface DAGProcessor {
+
+ /**
+ * Given a dag, process it and return the result dag.
+ */
+ List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes,
DAGProcessContext context);
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResource.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResource.java
new file mode 100644
index 0000000..91db90a
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResource.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource;
+
+/**
+ * Resource for node: parallelism. And other resource latitudes needed to add.
+ */
+public class NodeResource {
+
+ // node parallelism
+ private int parallelism = -1;
+
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ public void setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("{");
+ sb.append("parallelism=").append(parallelism);
+ sb.append("}");
+ return sb.toString();
+ }
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchFinalParallelismSetter.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchFinalParallelismSetter.java
new file mode 100644
index 0000000..3c574e5
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchFinalParallelismSetter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.batch.parallelism;
+
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+import
org.apache.flink.table.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecExchange;
+import
org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecValues;
+
+import org.apache.calcite.rel.RelDistribution;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Set final parallelism if needed at the beginning time, if parallelism of a
node is set to be final,
+ * it will not be changed by other parallelism calculator.
+ */
+public class BatchFinalParallelismSetter {
+
+ private final TableEnvironment tEnv;
+ private Set<ExecNode<?, ?>> calculatedNodeSet = new HashSet<>();
+ private Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap = new
HashMap<>();
+
+ private BatchFinalParallelismSetter(TableEnvironment tEnv) {
+ this.tEnv = tEnv;
+ }
+
+ /**
+ * Finding nodes that need to set final parallelism.
+ */
+ public static Map<ExecNode<?, ?>, Integer> calculate(TableEnvironment
tEnv, List<ExecNode<?, ?>> sinkNodes) {
+ BatchFinalParallelismSetter setter = new
BatchFinalParallelismSetter(tEnv);
+ sinkNodes.forEach(setter::calculate);
+ return setter.finalParallelismNodeMap;
+ }
+
+ private void calculate(ExecNode<?, ?> batchExecNode) {
+ if (!calculatedNodeSet.add(batchExecNode)) {
+ return;
+ }
+ if (batchExecNode instanceof BatchExecTableSourceScan) {
+ calculateTableSource((BatchExecTableSourceScan)
batchExecNode);
+ } else if (batchExecNode instanceof BatchExecBoundedStreamScan)
{
+ calculateBoundedStreamScan((BatchExecBoundedStreamScan)
batchExecNode);
+ } else if (batchExecNode instanceof BatchExecValues) {
+ calculateValues((BatchExecValues) batchExecNode);
+ } else {
+ calculateSingleton(batchExecNode);
+ }
+ }
+
+ private void calculateTableSource(BatchExecTableSourceScan
tableSourceScan) {
+ StreamTransformation transformation =
tableSourceScan.getSourceTransformation(tEnv.streamEnv());
+ if (transformation.getMaxParallelism() > 0) {
+ finalParallelismNodeMap.put(tableSourceScan,
transformation.getMaxParallelism());
+ }
+ }
+
+ private void calculateBoundedStreamScan(BatchExecBoundedStreamScan
boundedStreamScan) {
+ StreamTransformation transformation =
boundedStreamScan.getSourceTransformation();
+ int parallelism = transformation.getParallelism();
+ if (parallelism <= 0) {
+ parallelism = tEnv.streamEnv().getParallelism();
+ }
+ finalParallelismNodeMap.put(boundedStreamScan, parallelism);
+ }
+
+ private void calculateSingleton(ExecNode<?, ?> execNode) {
+ calculateInputs(execNode);
+ for (ExecNode<?, ?> inputNode : execNode.getInputNodes()) {
+ if (inputNode instanceof BatchExecExchange) {
+ // set parallelism as 1 to GlobalAggregate and
other global node.
+ if (((BatchExecExchange)
inputNode).getDistribution().getType() == RelDistribution.Type.SINGLETON) {
+ finalParallelismNodeMap.put(execNode,
1);
+ return;
+ }
+ }
+ }
+ }
+
+ private void calculateValues(BatchExecValues valuesBatchExec) {
+ finalParallelismNodeMap.put(valuesBatchExec, 1);
+ }
+
+ private void calculateInputs(ExecNode<?, ?> node) {
+ node.getInputNodes().forEach(this::calculate);
+ }
+}
+
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchParallelismProcessor.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchParallelismProcessor.java
new file mode 100644
index 0000000..64b7c28
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchParallelismProcessor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.batch.parallelism;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.plan.nodes.exec.BatchExecNode;
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecSink;
+import org.apache.flink.table.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Processor for calculating parallelism for {@link BatchExecNode}.
+ */
+public class BatchParallelismProcessor implements DAGProcessor {
+
+ @Override
+ public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes,
DAGProcessContext context) {
+ sinkNodes.forEach(s -> Preconditions.checkArgument(s instanceof
BatchExecNode));
+ TableEnvironment tEnv = context.getTableEnvironment();
+ List<ExecNode<?, ?>> rootNodes = filterSinkNodes(sinkNodes);
+ Map<ExecNode<?, ?>, Integer> nodeToFinalParallelismMap =
BatchFinalParallelismSetter.calculate(tEnv, rootNodes);
+ Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap =
ShuffleStageGenerator.generate(rootNodes, nodeToFinalParallelismMap);
+ new
BatchShuffleStageParallelismCalculator(tEnv.getConfig().getConf(),
tEnv.streamEnv().getParallelism()).calculate(nodeShuffleStageMap.values());
+ for (ExecNode<?, ?> node : nodeShuffleStageMap.keySet()) {
+
node.getResource().setParallelism(nodeShuffleStageMap.get(node).getParallelism());
+ }
+ return sinkNodes;
+ }
+
+ /**
+ * Filter sink nodes because parallelism of sink nodes is calculated
after translateToPlan, as
+ * transformations generated by BatchExecSink have too many uncertainty
factors. Filtering here
+ * can let later process easier.
+ */
+ private List<ExecNode<?, ?>> filterSinkNodes(List<ExecNode<?, ?>>
sinkNodes) {
+ List<ExecNode<?, ?>> rootNodes = new ArrayList<>();
+ sinkNodes.forEach(s -> {
+ if (s instanceof BatchExecSink) {
+ rootNodes.add(s.getInputNodes().get(0));
+ } else {
+ rootNodes.add(s);
+ }
+ });
+ return rootNodes;
+ }
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchShuffleStageParallelismCalculator.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchShuffleStageParallelismCalculator.java
new file mode 100644
index 0000000..f72bdb4
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchShuffleStageParallelismCalculator.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.batch.parallelism;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+import
org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Parallelism calculator for shuffle stage.
+ */
+public class BatchShuffleStageParallelismCalculator {
+ private static final Logger LOG =
LoggerFactory.getLogger(BatchShuffleStageParallelismCalculator.class);
+ private final Configuration tableConf;
+ private final int envParallelism;
+
+ public BatchShuffleStageParallelismCalculator(Configuration tableConf,
int envParallelism) {
+ this.tableConf = tableConf;
+ this.envParallelism = envParallelism;
+ }
+
+ public void calculate(Collection<ShuffleStage> shuffleStages) {
+ Set<ShuffleStage> shuffleStageSet = new
HashSet<>(shuffleStages);
+ shuffleStageSet.forEach(this::calculate);
+ }
+
+ @VisibleForTesting
+ protected void calculate(ShuffleStage shuffleStage) {
+ if (shuffleStage.isFinalParallelism()) {
+ return;
+ }
+ Set<ExecNode<?, ?>> nodeSet = shuffleStage.getExecNodeSet();
+ int maxSourceParallelism = -1;
+ for (ExecNode<?, ?> node : nodeSet) {
+ if (node instanceof BatchExecTableSourceScan) {
+ int result =
calculateSource((BatchExecTableSourceScan) node);
+ if (result > maxSourceParallelism) {
+ maxSourceParallelism = result;
+ }
+ }
+ }
+ if (maxSourceParallelism > 0) {
+ shuffleStage.setParallelism(maxSourceParallelism,
false);
+ } else {
+
shuffleStage.setParallelism(NodeResourceConfig.getOperatorDefaultParallelism(getTableConf(),
envParallelism), false);
+ }
+ }
+
+ private int calculateSource(BatchExecTableSourceScan tableSourceScan) {
+ boolean infer =
!NodeResourceConfig.getInferMode(tableConf).equals(NodeResourceConfig.InferMode.NONE);
+ LOG.info("infer source partitions num: " + infer);
+ if (infer) {
+ double rowCount =
tableSourceScan.getEstimatedRowCount();
+ LOG.info("source row count is : " + rowCount);
+ long rowsPerPartition =
NodeResourceConfig.getInferRowCountPerPartition(tableConf);
+ int maxNum =
NodeResourceConfig.getSourceMaxParallelism(tableConf);
+ return Math.min(maxNum,
+ Math.max(
+ (int) (rowCount /
rowsPerPartition),
+ 1));
+ } else {
+ return
NodeResourceConfig.getSourceParallelism(tableConf, envParallelism);
+ }
+ }
+
+ private Configuration getTableConf() {
+ return this.tableConf;
+ }
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/NodeResourceConfig.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/NodeResourceConfig.java
new file mode 100644
index 0000000..f67a6aa
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/NodeResourceConfig.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.batch.parallelism;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableConfigOptions;
+
+/**
+ * Deal with resource config for {@link
org.apache.flink.table.plan.nodes.exec.ExecNode}.
+ */
+public class NodeResourceConfig {
+
+ /**
+ * How many Bytes per MB.
+ */
+ public static final long SIZE_IN_MB = 1024L * 1024;
+
+ /**
+ * Gets the config parallelism for source.
+ * @param tableConf Configuration.
+ * @return the config parallelism for source.
+ */
+ public static int getSourceParallelism(Configuration tableConf, int
envParallelism) {
+ int parallelism = tableConf.getInteger(
+
TableConfigOptions.SQL_RESOURCE_SOURCE_PARALLELISM);
+ if (parallelism <= 0) {
+ parallelism = getOperatorDefaultParallelism(tableConf,
envParallelism);
+ }
+ return parallelism;
+ }
+
+ /**
+ * Gets the config parallelism for sink. If it is not set, return -1.
+ * @param tableConf Configuration.
+ * @return the config parallelism for sink.
+ */
+ public static int getSinkParallelism(Configuration tableConf) {
+ return
tableConf.getInteger(TableConfigOptions.SQL_RESOURCE_SINK_PARALLELISM);
+ }
+
+ /**
+ * Gets the config max num of source parallelism.
+ * @param tableConf Configuration.
+ * @return the config max num of source parallelism.
+ */
+ public static int getSourceMaxParallelism(Configuration tableConf) {
+ return tableConf.getInteger(
+
TableConfigOptions.SQL_RESOURCE_INFER_SOURCE_PARALLELISM_MAX);
+ }
+
+ /**
+ * Gets the config row count that one partition processes.
+ * @param tableConf Configuration.
+ * @return the config row count that one partition processes.
+ */
+ public static long getInferRowCountPerPartition(Configuration
tableConf) {
+ return tableConf.getLong(
+
TableConfigOptions.SQL_RESOURCE_INFER_ROWS_PER_PARTITION);
+ }
+
+ /**
+ * Gets default parallelism of operator.
+ * @param tableConf Configuration.
+ * @return default parallelism of operator.
+ */
+ public static int getOperatorDefaultParallelism(Configuration
tableConf, int envParallelism) {
+ int parallelism = tableConf.getInteger(
+
TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM);
+ if (parallelism <= 0) {
+ parallelism = envParallelism;
+ }
+ return parallelism;
+ }
+
+ /**
+ * Infer resource mode.
+ */
+ public enum InferMode {
+ NONE, ONLY_SOURCE
+ }
+
+ public static InferMode getInferMode(Configuration tableConf) {
+ String config = tableConf.getString(
+ TableConfigOptions.SQL_RESOURCE_INFER_MODE);
+ try {
+ return InferMode.valueOf(config);
+ } catch (IllegalArgumentException ex) {
+ throw new IllegalArgumentException("Infer mode can only
be set: NONE or ONLY_SOURCE.");
+ }
+ }
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/ShuffleStage.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/ShuffleStage.java
new file mode 100644
index 0000000..80aa778
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/ShuffleStage.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.batch.parallelism;
+
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * There are no shuffle when transferring data in a shuffleStage.
+ */
+public class ShuffleStage {
+
+ private final Set<ExecNode<?, ?>> execNodeSet = new LinkedHashSet<>();
+
+ // parallelism of this shuffleStage.
+ private int parallelism = -1;
+
+ // whether this parallelism is final, if it is final, it can not be
changed.
+ private boolean isFinalParallelism = false;
+
+ public void addNode(ExecNode<?, ?> node) {
+ execNodeSet.add(node);
+ }
+
+ public void addNodeSet(Set<ExecNode<?, ?>> nodeSet) {
+ execNodeSet.addAll(nodeSet);
+ }
+
+ public void removeNode(ExecNode<?, ?> node) {
+ this.execNodeSet.remove(node);
+ }
+
+ public Set<ExecNode<?, ?>> getExecNodeSet() {
+ return this.execNodeSet;
+ }
+
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ public void setParallelism(int parallelism, boolean finalParallelism) {
+ if (this.isFinalParallelism) {
+ if (finalParallelism && this.parallelism !=
parallelism) {
+ throw new IllegalArgumentException("both fixed
parallelism are not equal, old: " + this.parallelism + ", new: " + parallelism);
+ }
+ } else {
+ if (finalParallelism) {
+ this.parallelism = parallelism;
+ this.isFinalParallelism = true;
+ } else {
+ this.parallelism = Math.max(this.parallelism,
parallelism);
+ }
+ }
+ }
+
+ public boolean isFinalParallelism() {
+ return isFinalParallelism;
+ }
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/ShuffleStageGenerator.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/ShuffleStageGenerator.java
new file mode 100644
index 0000000..4e111e0
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/ShuffleStageGenerator.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.batch.parallelism;
+
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecExchange;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecUnion;
+
+import org.apache.calcite.rel.RelDistribution;
+
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Build exec nodes to shuffleStages according to {@link BatchExecExchange}.
+ * If there is data shuffle between two adjacent exec nodes,
+ * they are belong to different shuffleStages.
+ * If there is no data shuffle between two adjacent exec nodes, but
+ * they have different final parallelism, they are also belong to different
shuffleStages.
+ */
+public class ShuffleStageGenerator {
+
+ private final Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap =
new LinkedHashMap<>();
+ private final Map<ExecNode<?, ?>, Integer> nodeToFinalParallelismMap;
+
+ private ShuffleStageGenerator(Map<ExecNode<?, ?>, Integer>
nodeToFinalParallelismMap) {
+ this.nodeToFinalParallelismMap = nodeToFinalParallelismMap;
+ }
+
+ public static Map<ExecNode<?, ?>, ShuffleStage>
generate(List<ExecNode<?, ?>> sinkNodes, Map<ExecNode<?, ?>, Integer>
finalParallelismNodeMap) {
+ ShuffleStageGenerator generator = new
ShuffleStageGenerator(finalParallelismNodeMap);
+ sinkNodes.forEach(generator::buildShuffleStages);
+ Map<ExecNode<?, ?>, ShuffleStage> result =
generator.getNodeShuffleStageMap();
+ result.values().forEach(s -> {
+ List<ExecNode<?, ?>> virtualNodeList =
s.getExecNodeSet().stream().filter(ShuffleStageGenerator::isVirtualNode).collect(toList());
+ virtualNodeList.forEach(s::removeNode);
+ });
+ return generator.getNodeShuffleStageMap().entrySet().stream()
+ .filter(x -> !isVirtualNode(x.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ Map.Entry::getValue,
+ (e1, e2) -> e1,
+ LinkedHashMap::new));
+ }
+
+ private void buildShuffleStages(ExecNode<?, ?> execNode) {
+ if (nodeShuffleStageMap.containsKey(execNode)) {
+ return;
+ }
+ for (ExecNode<?, ?> input : execNode.getInputNodes()) {
+ buildShuffleStages((input));
+ }
+
+ if (execNode.getInputNodes().isEmpty()) {
+ // source node
+ ShuffleStage shuffleStage = new ShuffleStage();
+ shuffleStage.addNode(execNode);
+ if (nodeToFinalParallelismMap.containsKey(execNode)) {
+
shuffleStage.setParallelism(nodeToFinalParallelismMap.get(execNode), true);
+ }
+ nodeShuffleStageMap.put(execNode, shuffleStage);
+ } else if (execNode instanceof BatchExecExchange &&
+ !(((BatchExecExchange)
execNode).getDistribution().getType() ==
RelDistribution.Type.RANGE_DISTRIBUTED)) {
+ // do nothing
+ } else {
+ Set<ShuffleStage> inputShuffleStages =
getInputShuffleStages(execNode);
+ Integer parallelism =
nodeToFinalParallelismMap.get(execNode);
+ ShuffleStage inputShuffleStage =
mergeInputShuffleStages(inputShuffleStages, parallelism);
+ inputShuffleStage.addNode(execNode);
+ nodeShuffleStageMap.put(execNode, inputShuffleStage);
+ }
+ }
+
+ private ShuffleStage mergeInputShuffleStages(Set<ShuffleStage>
shuffleStageSet, Integer parallelism) {
+ if (parallelism != null) {
+ ShuffleStage resultShuffleStage = new ShuffleStage();
+ resultShuffleStage.setParallelism(parallelism, true);
+ for (ShuffleStage shuffleStage : shuffleStageSet) {
+ if (!shuffleStage.isFinalParallelism() ||
shuffleStage.getParallelism() == parallelism) {
+ mergeShuffleStage(resultShuffleStage,
shuffleStage);
+ }
+ }
+ return resultShuffleStage;
+ } else {
+ ShuffleStage resultShuffleStage =
shuffleStageSet.stream()
+
.filter(ShuffleStage::isFinalParallelism)
+
.max(Comparator.comparing(ShuffleStage::getParallelism))
+ .orElse(new ShuffleStage());
+ for (ShuffleStage shuffleStage : shuffleStageSet) {
+ if (!shuffleStage.isFinalParallelism() ||
shuffleStage.getParallelism() == resultShuffleStage.getParallelism()) {
+ mergeShuffleStage(resultShuffleStage,
shuffleStage);
+ }
+ }
+ return resultShuffleStage;
+ }
+ }
+
+ private void mergeShuffleStage(ShuffleStage shuffleStage, ShuffleStage
other) {
+ Set<ExecNode<?, ?>> nodeSet = other.getExecNodeSet();
+ shuffleStage.addNodeSet(nodeSet);
+ for (ExecNode<?, ?> r : nodeSet) {
+ nodeShuffleStageMap.put(r, shuffleStage);
+ }
+ }
+
+ private Set<ShuffleStage> getInputShuffleStages(ExecNode<?, ?> node) {
+ Set<ShuffleStage> shuffleStageList = new HashSet<>();
+ for (ExecNode<?, ?> input : node.getInputNodes()) {
+ ShuffleStage oneInputShuffleStage =
nodeShuffleStageMap.get(input);
+ if (oneInputShuffleStage != null) {
+ shuffleStageList.add(oneInputShuffleStage);
+ }
+ }
+ return shuffleStageList;
+ }
+
+ private static boolean isVirtualNode(ExecNode<?, ?> node) {
+ return node instanceof BatchExecUnion;
+ }
+
+ private Map<ExecNode<?, ?>, ShuffleStage> getNodeShuffleStageMap() {
+ return nodeShuffleStageMap;
+ }
+}
+
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 05ba98e..79ad59c 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -26,6 +26,8 @@ import org.apache.flink.streaming.api.graph.{StreamGraph,
StreamGraphGenerator}
import org.apache.flink.streaming.api.transformations.StreamTransformation
import org.apache.flink.table.plan.`trait`.FlinkRelDistributionTraitDef
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
+import org.apache.flink.table.plan.nodes.process.DAGProcessContext
+import
org.apache.flink.table.plan.nodes.resource.batch.parallelism.BatchParallelismProcessor
import
org.apache.flink.table.plan.optimize.{BatchCommonSubGraphBasedOptimizer,
Optimizer}
import org.apache.flink.table.plan.reuse.DeadlockBreakupProcessor
import org.apache.flink.table.plan.schema.{BatchTableSourceTable,
TableSourceSinkTable, TableSourceTable}
@@ -50,9 +52,9 @@ import _root_.scala.collection.JavaConversions._
* @param config The [[TableConfig]] of this [[BatchTableEnvironment]].
*/
class BatchTableEnvironment(
- val streamEnv: StreamExecutionEnvironment,
+ val execEnv: StreamExecutionEnvironment,
config: TableConfig)
- extends TableEnvironment(config) {
+ extends TableEnvironment(execEnv, config) {
// prefix for unique table names.
override private[flink] val tableNamePrefix = "_DataStreamTable_"
@@ -120,8 +122,10 @@ class BatchTableEnvironment(
override private[flink] def translateToExecNodeDag(rels: Seq[RelNode]):
Seq[ExecNode[_, _]] = {
val nodeDag = super.translateToExecNodeDag(rels)
+ val context = new DAGProcessContext(this)
// breakup deadlock
- new DeadlockBreakupProcessor().process(nodeDag)
+ val postNodeDag = new DeadlockBreakupProcessor().process(nodeDag, context)
+ new BatchParallelismProcessor().process(postNodeDag, context)
}
/**
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 8269613..2f31ef3 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -67,7 +67,7 @@ import _root_.scala.collection.JavaConversions._
abstract class StreamTableEnvironment(
private[flink] val execEnv: StreamExecutionEnvironment,
config: TableConfig)
- extends TableEnvironment(config) {
+ extends TableEnvironment(execEnv, config) {
// prefix for unique table names.
override private[flink] val tableNamePrefix = "_DataStreamTable_"
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 82d1655..3dbf25c 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -70,9 +70,13 @@ import _root_.scala.collection.mutable
/**
* The abstract base class for batch and stream TableEnvironments.
*
+ * @param streamEnv The [[JavaStreamExecEnv]] which is wrapped in this
+ * [[StreamTableEnvironment]].
* @param config The configuration of the TableEnvironment
*/
-abstract class TableEnvironment(val config: TableConfig) {
+abstract class TableEnvironment(
+ val streamEnv: JavaStreamExecEnv,
+ val config: TableConfig) {
protected val DEFAULT_JOB_NAME = "Flink Exec Table Job"
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdRowCount.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdRowCount.scala
index 3895292..60bc9f4 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdRowCount.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdRowCount.scala
@@ -26,10 +26,10 @@ import org.apache.calcite.rel.{RelNode, SingleRel}
import org.apache.calcite.rex.{RexLiteral, RexNode}
import org.apache.calcite.util._
import org.apache.flink.table.JDouble
+import org.apache.flink.table.api.PlannerConfigOptions
import org.apache.flink.table.calcite.FlinkContext
import org.apache.flink.table.plan.logical.{LogicalWindow, SlidingGroupWindow,
TumblingGroupWindow}
import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank,
WindowAggregate}
-import org.apache.flink.table.plan.nodes.exec.NodeResourceConfig
import org.apache.flink.table.plan.nodes.physical.batch._
import org.apache.flink.table.plan.stats.ValueInterval
import org.apache.flink.table.plan.util.AggregateUtil.{hasTimeIntervalType,
toLong}
@@ -171,7 +171,8 @@ class FlinkRelMdRowCount private extends
MetadataHandler[BuiltInMetadata.RowCoun
} else {
val inputRowCnt = mq.getRowCount(input)
val config =
rel.getCluster.getPlanner.getContext.asInstanceOf[FlinkContext].getTableConfig
- val parallelism = NodeResourceConfig.calOperatorParallelism(inputRowCnt,
config.getConf)
+ val parallelism = (inputRowCnt /
+
config.getConf.getLong(PlannerConfigOptions.SQL_OPTIMIZER_ROWS_PER_LOCALAGG) +
1).toInt
if (parallelism == 1) {
ndvOfGroupKeysOnGlobalAgg
} else if (grouping.isEmpty) {
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/exec/ExecNode.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/exec/ExecNode.scala
index d86ecd5..9006ab7 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/exec/ExecNode.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/exec/ExecNode.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.plan.nodes.exec
import org.apache.flink.streaming.api.transformations.StreamTransformation
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel
+import org.apache.flink.table.plan.nodes.resource.NodeResource
import java.util
@@ -33,11 +34,21 @@ import java.util
trait ExecNode[E <: TableEnvironment, T] {
/**
+ * Defines how much resource the node will take.
+ */
+ private val resource: NodeResource = new NodeResource
+
+ /**
* The [[StreamTransformation]] translated from this node.
*/
private var transformation: StreamTransformation[T] = _
/**
+ * Get node resource.
+ */
+ def getResource = resource
+
+ /**
* Translates this node into a Flink operator.
*
* <p>NOTE: returns same translate result if called multiple times.
@@ -83,5 +94,4 @@ trait ExecNode[E <: TableEnvironment, T] {
def accept(visitor: ExecNodeVisitor): Unit = {
visitor.visit(this)
}
-
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
index 4bcb025..8c26e9f 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
@@ -87,8 +87,9 @@ class BatchExecBoundedStreamScan(
tableEnv: BatchTableEnvironment): StreamTransformation[BaseRow] = {
val config = tableEnv.getConfig
val batchTransform = boundedStreamTable.dataStream.getTransformation
+ batchTransform.setParallelism(getResource.getParallelism)
if (needInternalConversion) {
- ScanUtil.convertToInternalRow(
+ val conversionTransform = ScanUtil.convertToInternalRow(
CodeGeneratorContext(config),
batchTransform,
boundedStreamTable.fieldIndexes,
@@ -97,11 +98,16 @@ class BatchExecBoundedStreamScan(
getTable.getQualifiedName,
config,
None)
+ conversionTransform.setParallelism(getResource.getParallelism)
+ conversionTransform
} else {
batchTransform.asInstanceOf[StreamTransformation[BaseRow]]
}
}
+ def getSourceTransformation: StreamTransformation[_] =
+ boundedStreamTable.dataStream.getTransformation
+
def needInternalConversion: Boolean = {
ScanUtil.hasTimeAttributeField(boundedStreamTable.fieldIndexes) ||
ScanUtil.needsConversion(
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCalc.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCalc.scala
index f0909eb..0b3ca4e 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCalc.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCalc.scala
@@ -160,6 +160,6 @@ class BatchExecCalc(
RelExplainUtil.calcToString(calcProgram, getExpressionString),
operator,
BaseRowTypeInfo.of(outputType),
-
config.getConf.getInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM))
+ getResource.getParallelism)
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala
index 16f0cb5..9623b37 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala
@@ -203,7 +203,7 @@ class BatchExecCorrelate(
condition,
outputRowType,
joinType,
-
tableEnv.getConfig.getConf.getInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM),
+ getResource.getParallelism,
retainHeader = false,
getExpressionString,
"BatchExecCorrelate")
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExpand.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExpand.scala
index 6c01907..3327e1d 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExpand.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExpand.scala
@@ -104,7 +104,7 @@ class BatchExecExpand(
operatorName,
operator,
BaseRowTypeInfo.of(outputType),
-
config.getConf.getInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM))
+ getResource.getParallelism)
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregate.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregate.scala
index 3793ed3..105c9b7 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregate.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregate.scala
@@ -154,12 +154,4 @@ class BatchExecHashAggregate(
val aggregateNamePrefix = if (isMerge) "Global" else "Complete"
aggOperatorName(aggregateNamePrefix + "HashAggregate")
}
-
- override def getParallelism(input: StreamTransformation[BaseRow], conf:
TableConfig): Int = {
- if (isFinal && grouping.length == 0) {
- 1
- } else {
-
conf.getConf.getInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM)
- }
- }
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
index 85ebbc0..aee08ad 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.plan.nodes.physical.batch
import org.apache.flink.streaming.api.transformations.{OneInputTransformation,
StreamTransformation}
-import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig,
TableConfigOptions}
+import org.apache.flink.table.api.{BatchTableEnvironment, TableConfigOptions}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGeneratorContext
import org.apache.flink.table.codegen.agg.batch.{AggWithoutKeysCodeGenerator,
HashAggCodeGenerator}
@@ -27,6 +27,7 @@ import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.plan.cost.FlinkCost._
import org.apache.flink.table.plan.cost.FlinkCostFactory
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
+import
org.apache.flink.table.plan.nodes.resource.batch.parallelism.NodeResourceConfig
import
org.apache.flink.table.plan.util.AggregateUtil.transformToBatchAggregateInfoList
import org.apache.flink.table.plan.util.FlinkRelMdUtil
import org.apache.flink.table.runtime.CodeGenOperatorFactory
@@ -116,8 +117,6 @@ abstract class BatchExecHashAggregateBase(
def getOperatorName: String
- def getParallelism(input: StreamTransformation[BaseRow], conf: TableConfig):
Int
-
override def translateToPlanInternal(
tableEnv: BatchTableEnvironment): StreamTransformation[BaseRow] = {
val input = getInputNodes.get(0).translateToPlan(tableEnv)
@@ -134,9 +133,9 @@ abstract class BatchExecHashAggregateBase(
ctx, relBuilder, aggInfos, inputType, outputType, isMerge, isFinal,
"NoGrouping")
} else {
val reservedManagedMem = tableEnv.config.getConf.getInteger(
- TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM) *
TableConfigOptions.SIZE_IN_MB
+ TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM) *
NodeResourceConfig.SIZE_IN_MB
val maxManagedMem = tableEnv.config.getConf.getInteger(
- TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MAX_MEM) *
TableConfigOptions.SIZE_IN_MB
+ TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MAX_MEM) *
NodeResourceConfig.SIZE_IN_MB
new HashAggCodeGenerator(
ctx, relBuilder, aggInfos, inputType, outputType, grouping,
auxGrouping, isMerge, isFinal
).genWithKeys(reservedManagedMem, maxManagedMem)
@@ -147,6 +146,6 @@ abstract class BatchExecHashAggregateBase(
getOperatorName,
operator,
BaseRowTypeInfo.of(outputType),
- getParallelism(input, tableEnv.config))
+ getResource.getParallelism)
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
index cb87455..15822b9 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
@@ -29,6 +29,7 @@ import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.plan.logical.LogicalWindow
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
+import
org.apache.flink.table.plan.nodes.resource.batch.parallelism.NodeResourceConfig
import
org.apache.flink.table.plan.util.AggregateUtil.transformToBatchAggregateInfoList
import org.apache.flink.table.plan.util.FlinkRelMdUtil
import org.apache.flink.table.runtime.CodeGenOperatorFactory
@@ -132,9 +133,9 @@ abstract class BatchExecHashWindowAggregateBase(
val (windowSize: Long, slideSize: Long) =
WindowCodeGenerator.getWindowDef(window)
val reservedManagedMem = tableEnv.config.getConf.getInteger(
- TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM) *
TableConfigOptions.SIZE_IN_MB
+ TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM) *
NodeResourceConfig.SIZE_IN_MB
val maxManagedMem = tableEnv.config.getConf.getInteger(
- TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MAX_MEM) *
TableConfigOptions.SIZE_IN_MB
+ TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MAX_MEM) *
NodeResourceConfig.SIZE_IN_MB
val generatedOperator = new HashWindowCodeGenerator(
ctx, relBuilder, window, inputTimeFieldIndex,
@@ -148,6 +149,6 @@ abstract class BatchExecHashWindowAggregateBase(
getOperatorName,
operator,
BaseRowTypeInfo.of(outputType),
-
tableEnv.getConfig.getConf.getInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM))
+ getResource.getParallelism)
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLimit.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLimit.scala
index 3944461..8a98dbc 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLimit.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLimit.scala
@@ -110,7 +110,7 @@ class BatchExecLimit(
getOperatorName,
operator,
inputType,
- if (isGlobal) 1 else input.getParallelism)
+ getResource.getParallelism)
}
private def getOperatorName = {
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalHashAggregate.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalHashAggregate.scala
index b475f46..ac3e8e0 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalHashAggregate.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalHashAggregate.scala
@@ -133,7 +133,4 @@ class BatchExecLocalHashAggregate(
}
override def getOperatorName: String = aggOperatorName("LocalHashAggregate")
-
- override def getParallelism(input: StreamTransformation[BaseRow], conf:
TableConfig): Int =
- input.getParallelism
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala
index e62c850..ce85f2f 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala
@@ -142,7 +142,4 @@ class BatchExecLocalSortAggregate(
override def getOperatorName: String = aggOperatorName("LocalSortAggregate")
- override def getParallelism(input: StreamTransformation[BaseRow], conf:
TableConfig): Int =
- input.getParallelism
-
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
index 3839755..d8b68fc 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
@@ -19,10 +19,7 @@
package org.apache.flink.table.plan.nodes.physical.batch
import org.apache.flink.runtime.operators.DamBehavior
-import org.apache.flink.streaming.api.transformations.StreamTransformation
-import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.plan.logical.LogicalWindow
@@ -91,7 +88,4 @@ class BatchExecLocalSortWindowAggregate(
override def getDamBehavior: DamBehavior = DamBehavior.MATERIALIZING
override def getOperatorName: String = "LocalSortWindowAggregateBatchExec"
-
- override def getParallelism(input: StreamTransformation[BaseRow], conf:
TableConfig): Int =
- input.getParallelism
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLookupJoin.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLookupJoin.scala
index 694db12..d6303ce 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLookupJoin.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLookupJoin.scala
@@ -89,14 +89,12 @@ class BatchExecLookupJoin(
val inputTransformation = getInputNodes.get(0).translateToPlan(tableEnv)
.asInstanceOf[StreamTransformation[BaseRow]]
- val defaultParallelism = tableEnv.getConfig.getConf
- .getInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM)
val transformation = translateToPlanInternal(
inputTransformation,
tableEnv.streamEnv,
tableEnv.config,
tableEnv.getRelBuilder)
- transformation.setParallelism(defaultParallelism)
+ transformation.setParallelism(getResource.getParallelism)
transformation
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
index e7b1a4e..cb34e27 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
@@ -34,6 +34,7 @@ import
org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistri
import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
import
org.apache.flink.table.plan.nodes.physical.batch.OverWindowMode.OverWindowMode
+import
org.apache.flink.table.plan.nodes.resource.batch.parallelism.NodeResourceConfig
import
org.apache.flink.table.plan.util.AggregateUtil.transformToBatchAggregateInfoList
import org.apache.flink.table.plan.util.OverAggregateUtil.getLongBoundary
import org.apache.flink.table.plan.util.{FlinkRelOptUtil, OverAggregateUtil,
RelExplainUtil}
@@ -405,13 +406,13 @@ class BatchExecOverAggregate(
val windowFrames = createOverWindowFrames(tableEnv)
new BufferDataOverWindowOperator(
tableEnv.getConfig.getConf.getInteger(
- TableConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM) *
TableConfigOptions.SIZE_IN_MB,
+ TableConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM) *
NodeResourceConfig.SIZE_IN_MB,
windowFrames,
genComparator,
inputType.getChildren.forall(t => BinaryRow.isInFixedLengthPart(t)))
}
new OneInputTransformation(
- input, "OverAggregate", operator, BaseRowTypeInfo.of(outputType),
input.getParallelism)
+ input, "OverAggregate", operator, BaseRowTypeInfo.of(outputType),
getResource.getParallelism)
}
def createOverWindowFrames(tableEnv: BatchTableEnvironment):
Array[OverWindowFrame] = {
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecRank.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecRank.scala
index 2faa699..1dc1be8 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecRank.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecRank.scala
@@ -288,7 +288,7 @@ class BatchExecRank(
getOperatorName,
operator,
BaseRowTypeInfo.of(outputType),
- input.getParallelism)
+ getResource.getParallelism)
}
private def getOperatorName: String = {
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
index 22d88d1..f7565a7 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
@@ -27,6 +27,7 @@ import org.apache.flink.table.codegen.{CodeGenUtils,
CodeGeneratorContext}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.nodes.calcite.Sink
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
+import
org.apache.flink.table.plan.nodes.resource.batch.parallelism.NodeResourceConfig
import org.apache.flink.table.sinks.{BatchTableSink, DataStreamTableSink,
TableSink}
import
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
import org.apache.flink.table.typeutils.BaseRowTypeInfo
@@ -80,9 +81,22 @@ class BatchExecSink[T](
case batchTableSink: BatchTableSink[T] =>
val transformation = translateToStreamTransformation(withChangeFlag =
false, tableEnv)
val boundedStream = new DataStream(tableEnv.streamEnv, transformation)
- batchTableSink.emitBoundedStream(
+ val sinkTransformation = batchTableSink.emitBoundedStream(
boundedStream, tableEnv.getConfig,
tableEnv.streamEnv.getConfig).getTransformation
+ if (sinkTransformation.getMaxParallelism > 0) {
+
sinkTransformation.setParallelism(sinkTransformation.getMaxParallelism)
+ } else {
+ val configSinkParallelism = NodeResourceConfig.getSinkParallelism(
+ tableEnv.getConfig.getConf)
+ if (configSinkParallelism > 0) {
+ sinkTransformation.setParallelism(configSinkParallelism)
+ } else if (boundedStream.getParallelism > 0) {
+ sinkTransformation.setParallelism(boundedStream.getParallelism)
+ }
+ }
+ sinkTransformation
+
case streamTableSink: DataStreamTableSink[T] =>
// In case of table to bounded stream through
BatchTableEnvironment#toBoundedStream, we
// insert a DataStreamTableSink then wrap it as a LogicalSink, there
is no real batch table
@@ -133,6 +147,4 @@ class BatchExecSink[T](
"This is a bug and should not happen.
Please file an issue.")
}
}
-
-
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala
index e9f9ee8..ee106f3 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.codegen.sort.SortCodeGenerator
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
+import
org.apache.flink.table.plan.nodes.resource.batch.parallelism.NodeResourceConfig
import org.apache.flink.table.plan.util.{FlinkRelMdUtil, RelExplainUtil,
SortUtil}
import org.apache.flink.table.runtime.sort.SortOperator
import org.apache.flink.table.typeutils.BaseRowTypeInfo
@@ -114,12 +115,12 @@ class BatchExecSort(
val codeGen = new SortCodeGenerator(conf, keys, keyTypes, orders,
nullsIsLast)
val reservedMemorySize = conf.getConf.getInteger(
- TableConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM) *
TableConfigOptions.SIZE_IN_MB
+ TableConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM) *
NodeResourceConfig.SIZE_IN_MB
val maxMemorySize = conf.getConf.getInteger(
- TableConfigOptions.SQL_RESOURCE_SORT_BUFFER_MAX_MEM) *
TableConfigOptions.SIZE_IN_MB
+ TableConfigOptions.SQL_RESOURCE_SORT_BUFFER_MAX_MEM) *
NodeResourceConfig.SIZE_IN_MB
val perRequestSize = conf.getConf.getInteger(
- TableConfigOptions.SQL_EXEC_PER_REQUEST_MEM) *
TableConfigOptions.SIZE_IN_MB
+ TableConfigOptions.SQL_EXEC_PER_REQUEST_MEM) *
NodeResourceConfig.SIZE_IN_MB
val operator = new SortOperator(
reservedMemorySize,
@@ -133,6 +134,6 @@ class BatchExecSort(
s"Sort(${RelExplainUtil.collationToString(sortCollation, getRowType)})",
operator.asInstanceOf[OneInputStreamOperator[BaseRow, BaseRow]],
BaseRowTypeInfo.of(outputType),
- input.getParallelism)
+ getResource.getParallelism)
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortAggregate.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortAggregate.scala
index 1fded57..b4c7674 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortAggregate.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortAggregate.scala
@@ -169,8 +169,4 @@ class BatchExecSortAggregate(
aggOperatorName(aggregateNamePrefix + "SortAggregate")
}
- override def getParallelism(input: StreamTransformation[BaseRow], conf:
TableConfig): Int = {
- if (isFinal && grouping.length == 0) 1 else input.getParallelism
- }
-
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
index 094df9b..f663fe8 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
@@ -18,10 +18,10 @@
package org.apache.flink.table.plan.nodes.physical.batch
import org.apache.flink.streaming.api.transformations.{OneInputTransformation,
StreamTransformation}
-import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig,
TableConfigOptions}
+import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGeneratorContext
-import org.apache.flink.table.codegen.agg.batch.{AggWithoutKeysCodeGenerator,
HashAggCodeGenerator, SortAggCodeGenerator}
+import org.apache.flink.table.codegen.agg.batch.{AggWithoutKeysCodeGenerator,
SortAggCodeGenerator}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
@@ -100,8 +100,6 @@ abstract class BatchExecSortAggregateBase(
def getOperatorName: String
- def getParallelism(input: StreamTransformation[BaseRow], conf: TableConfig):
Int
-
override def translateToPlanInternal(
tableEnv: BatchTableEnvironment): StreamTransformation[BaseRow] = {
val input = getInputNodes.get(0).translateToPlan(tableEnv)
@@ -126,6 +124,6 @@ abstract class BatchExecSortAggregateBase(
getOperatorName,
operator,
BaseRowTypeInfo.of(outputType),
- getParallelism(input, tableEnv.config))
+ getResource.getParallelism)
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortLimit.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortLimit.scala
index 76545a5..872f314 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortLimit.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortLimit.scala
@@ -144,7 +144,7 @@ class BatchExecSortLimit(
getOperatorName,
operator,
inputType,
- if (isGlobal) 1 else input.getParallelism)
+ getResource.getParallelism)
}
private def getOperatorName = {
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
index 3a8217c..e2212b1 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
@@ -29,6 +29,7 @@ import
org.apache.flink.table.plan.`trait`.FlinkRelDistributionTraitDef
import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.plan.nodes.ExpressionFormat
import org.apache.flink.table.plan.nodes.exec.ExecNode
+import
org.apache.flink.table.plan.nodes.resource.batch.parallelism.NodeResourceConfig
import org.apache.flink.table.plan.util.{FlinkRelMdUtil, FlinkRelOptUtil,
JoinUtil, SortUtil}
import org.apache.flink.table.runtime.join.{FlinkJoinType,
SortMergeJoinOperator}
import org.apache.flink.table.types.logical.RowType
@@ -228,10 +229,10 @@ class BatchExecSortMergeJoin(
val condFunc = generateCondition(config, leftType, rightType)
val externalBufferMemory = config.getConf.getInteger(
- TableConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM) *
TableConfigOptions.SIZE_IN_MB
+ TableConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM) *
NodeResourceConfig.SIZE_IN_MB
val sortMemory = config.getConf.getInteger(
- TableConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM) *
TableConfigOptions.SIZE_IN_MB
+ TableConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM) *
NodeResourceConfig.SIZE_IN_MB
def newSortGen(originalKeys: Array[Int], t: RowType): SortCodeGenerator = {
val originalOrders = originalKeys.map(_ => true)
@@ -270,7 +271,7 @@ class BatchExecSortMergeJoin(
getOperatorName,
operator,
BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType)),
-
tableEnv.getConfig.getConf.getInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM))
+ getResource.getParallelism)
}
private def estimateOutputSize(relNode: RelNode): Double = {
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
index 2e0a526..7568444 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
@@ -96,8 +96,4 @@ class BatchExecSortWindowAggregate(
val aggregateNamePrefix = if (isMerge) "Global" else "Complete"
aggregateNamePrefix + "WindowSortAggregateBatchExec"
}
-
- override def getParallelism(input: StreamTransformation[BaseRow], conf:
TableConfig): Int = {
- if (isFinal && grouping.length == 0) 1 else input.getParallelism
- }
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
index b0323d8..c48e648 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
@@ -105,8 +105,6 @@ abstract class BatchExecSortWindowAggregateBase(
def getOperatorName: String
- def getParallelism(input: StreamTransformation[BaseRow], conf: TableConfig):
Int
-
override def translateToPlanInternal(
tableEnv: BatchTableEnvironment): StreamTransformation[BaseRow] = {
val input = getInputNodes.get(0).translateToPlan(tableEnv)
@@ -140,6 +138,6 @@ abstract class BatchExecSortWindowAggregateBase(
getOperatorName,
operator,
BaseRowTypeInfo.of(outputType),
- getParallelism(input, tableEnv.config))
+ getResource.getParallelism)
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
index f25538f..ce0a826 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
@@ -18,8 +18,14 @@
package org.apache.flink.table.plan.nodes.physical.batch
+import java.{lang, util}
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.runtime.operators.DamBehavior
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.transformations.StreamTransformation
import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
import org.apache.flink.table.codegen.CodeGeneratorContext
@@ -29,7 +35,6 @@ import
org.apache.flink.table.plan.nodes.physical.PhysicalTableSourceScan
import org.apache.flink.table.plan.schema.FlinkRelOptTable
import org.apache.flink.table.plan.util.ScanUtil
import org.apache.flink.table.sources.{BatchTableSource, TableSourceUtil}
-import
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
@@ -37,6 +42,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rex.RexNode
import java.util
+import
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
import scala.collection.JavaConversions._
@@ -82,7 +88,8 @@ class BatchExecTableSourceScan(
tableEnv: BatchTableEnvironment): StreamTransformation[BaseRow] = {
val config = tableEnv.getConfig
val bts = tableSource.asInstanceOf[BatchTableSource[_]]
- val inputTransform =
bts.getBoundedStream(tableEnv.streamEnv).getTransformation
+ val inputTransform =
bts.getBoundedStream(tableEnv.execEnv).getTransformation
+ inputTransform.setParallelism(getResource.getParallelism)
val fieldIndexes = TableSourceUtil.computeIndexMapping(
tableSource,
@@ -108,7 +115,7 @@ class BatchExecTableSourceScan(
tableEnv.getRelBuilder
)
if (needInternalConversion) {
- ScanUtil.convertToInternalRow(
+ val conversionTransform = ScanUtil.convertToInternalRow(
CodeGeneratorContext(config),
inputTransform.asInstanceOf[StreamTransformation[Any]],
fieldIndexes,
@@ -117,6 +124,8 @@ class BatchExecTableSourceScan(
getTable.getQualifiedName,
config,
rowtimeExpression)
+ conversionTransform.setParallelism(getResource.getParallelism)
+ conversionTransform
} else {
inputTransform.asInstanceOf[StreamTransformation[BaseRow]]
}
@@ -135,4 +144,13 @@ class BatchExecTableSourceScan(
tableSource, classOf[BatchTableSource[_]], tableSource.getClass, 0)
.getTypeClass.asInstanceOf[Class[_]])
}
+
+ def getSourceTransformation(
+ streamEnv: StreamExecutionEnvironment): StreamTransformation[_] = {
+
tableSource.asInstanceOf[BatchTableSource[_]].getBoundedStream(streamEnv).getTransformation
+ }
+
+ def getEstimatedRowCount: lang.Double = {
+ getCluster.getMetadataQuery.getRowCount(this)
+ }
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
index d6f0dca..32dbcd6 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
@@ -79,7 +79,10 @@ class BatchExecValues(
getRowType,
tuples,
getRelTypeName)
- tableEnv.streamEnv.createInput(inputFormat,
inputFormat.getProducedType).getTransformation
+ val transformation = tableEnv.streamEnv.createInput(inputFormat,
+ inputFormat.getProducedType).getTransformation
+ transformation.setParallelism(getResource.getParallelism)
+ transformation
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/DeadlockBreakupProcessor.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/DeadlockBreakupProcessor.scala
index 475c0c6..2106d0b 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/DeadlockBreakupProcessor.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/DeadlockBreakupProcessor.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableException
import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode,
ExecNodeVisitorImpl}
import org.apache.flink.table.plan.nodes.physical.batch._
+import org.apache.flink.table.plan.nodes.process.{DAGProcessContext,
DAGProcessor}
import com.google.common.collect.{Maps, Sets}
import org.apache.calcite.rel.RelNode
@@ -82,9 +83,10 @@ import scala.collection.mutable
* ScanTableSource
* }}}
*/
-class DeadlockBreakupProcessor {
+class DeadlockBreakupProcessor extends DAGProcessor {
- def process(rootNodes: util.List[ExecNode[_, _]]): util.List[ExecNode[_, _]]
= {
+ def process(rootNodes: util.List[ExecNode[_, _]],
+ context: DAGProcessContext): util.List[ExecNode[_, _]] = {
if (!rootNodes.forall(_.isInstanceOf[BatchExecNode[_]])) {
throw new TableException("Only BatchExecNode DAG is supported now")
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ExecNodePlanDumper.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ExecNodePlanDumper.scala
index 0f5ecfc..4184e9c 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ExecNodePlanDumper.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ExecNodePlanDumper.scala
@@ -58,13 +58,15 @@ object ExecNodePlanDumper {
detailLevel: SqlExplainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
withExecNodeId: Boolean = false,
withRetractTraits: Boolean = false,
- withOutputType: Boolean = false): String = {
+ withOutputType: Boolean = false,
+ withResource: Boolean = false): String = {
doConvertTreeToString(
node,
detailLevel = detailLevel,
withExecNodeId = withExecNodeId,
withRetractTraits = withRetractTraits,
- withOutputType = withOutputType)
+ withOutputType = withOutputType,
+ withResource = withResource)
}
/**
@@ -83,14 +85,16 @@ object ExecNodePlanDumper {
detailLevel: SqlExplainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
withExecNodeId: Boolean = false,
withRetractTraits: Boolean = false,
- withOutputType: Boolean = false): String = {
+ withOutputType: Boolean = false,
+ withResource: Boolean = false): String = {
if (nodes.length == 1) {
return treeToString(
nodes.head,
detailLevel,
withExecNodeId = withExecNodeId,
withRetractTraits = withRetractTraits,
- withOutputType = withOutputType)
+ withOutputType = withOutputType,
+ withResource = withResource)
}
val reuseInfoBuilder = new ReuseInfoBuilder()
@@ -124,7 +128,8 @@ object ExecNodePlanDumper {
withRetractTraits = withRetractTraits,
withOutputType = withOutputType,
stopExplainNodes = Some(stopExplainNodes),
- reuseInfoMap = Some(reuseInfoMap))
+ reuseInfoMap = Some(reuseInfoMap),
+ withResource = withResource)
sb.append(reusePlan).append(System.lineSeparator)
if (isReuseNode) {
// update visit info after the reuse node visited
@@ -151,7 +156,8 @@ object ExecNodePlanDumper {
withRetractTraits: Boolean = false,
withOutputType: Boolean = false,
stopExplainNodes: Option[util.Set[ExecNode[_, _]]] = None,
- reuseInfoMap: Option[util.IdentityHashMap[ExecNode[_, _], (Integer,
Boolean)]] = None
+ reuseInfoMap: Option[util.IdentityHashMap[ExecNode[_, _], (Integer,
Boolean)]] = None,
+ withResource: Boolean = false
): String = {
// TODO refactor this part of code
// get ExecNode explain value by RelNode#explain now
@@ -164,7 +170,8 @@ object ExecNodePlanDumper {
withRetractTraits = withRetractTraits,
withOutputType = withOutputType,
stopExplainNodes = stopExplainNodes,
- reuseInfoMap = reuseInfoMap)
+ reuseInfoMap = reuseInfoMap,
+ withResource = withResource)
node.asInstanceOf[RelNode].explain(planWriter)
sw.toString
}
@@ -217,7 +224,8 @@ class NodeTreeWriterImpl(
withRetractTraits: Boolean = false,
withOutputType: Boolean = false,
stopExplainNodes: Option[util.Set[ExecNode[_, _]]] = None,
- reuseInfoMap: Option[util.IdentityHashMap[ExecNode[_, _], (Integer,
Boolean)]] = None)
+ reuseInfoMap: Option[util.IdentityHashMap[ExecNode[_, _], (Integer,
Boolean)]] = None,
+ withResource: Boolean = false)
extends RelWriterImpl(pw, explainLevel, false) {
require((stopExplainNodes.isEmpty && reuseInfoMap.isEmpty) ||
@@ -323,6 +331,10 @@ class NodeTreeWriterImpl(
printValues.add(Pair.of("__id__", rel.getId.toString))
}
+ if (withResource) {
+ printValues.add(Pair.of("resource", node.getResource))
+ }
+
if (withRetractTraits) {
rel match {
case streamRel: StreamPhysicalRel =>
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/MockNodeTestBase.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/MockNodeTestBase.java
new file mode 100644
index 0000000..5ecbb48
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/MockNodeTestBase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource;
+
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+import
org.apache.flink.table.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecCalc;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecExchange;
+import
org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
+
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Base for test with mock node list.
+ */
+public class MockNodeTestBase {
+
+ protected List<ExecNode> nodeList;
+
+ protected void updateNode(int index, ExecNode<?, ?> node) {
+ nodeList.set(index, node);
+ NodeResource resource = new NodeResource();
+ when(node.getResource()).thenReturn(resource);
+ when(node.toString()).thenReturn("id: " + index);
+ if (node instanceof BatchExecTableSourceScan) {
+ StreamTransformation transformation =
mock(StreamTransformation.class);
+ when(((BatchExecTableSourceScan)
node).getSourceTransformation(any())).thenReturn(transformation);
+ when(transformation.getMaxParallelism()).thenReturn(-1);
+ } else if (node instanceof BatchExecBoundedStreamScan) {
+ StreamTransformation transformation =
mock(StreamTransformation.class);
+ when(((BatchExecBoundedStreamScan)
node).getSourceTransformation()).thenReturn(transformation);
+ } else if (node instanceof BatchExecExchange) {
+ RelDistribution distribution =
mock(RelDistribution.class);
+
when(distribution.getType()).thenReturn(RelDistribution.Type.BROADCAST_DISTRIBUTED);
+ when(((BatchExecExchange)
node).getDistribution()).thenReturn(distribution);
+ }
+ }
+
+ protected void createNodeList(int num) {
+ nodeList = new LinkedList<>();
+ for (int i = 0; i < num; i++) {
+ ExecNode<?, ?> node = mock(BatchExecCalc.class);
+ when(node.getInputNodes()).thenReturn(new
ArrayList<>());
+ when(node.toString()).thenReturn("id: " + i);
+ nodeList.add(node);
+ }
+ }
+
+ protected void connect(int nodeIndex, int... inputNodeIndexes) {
+ List<ExecNode<?, ?>> inputNodes = new
ArrayList<>(inputNodeIndexes.length);
+ for (int inputIndex : inputNodeIndexes) {
+ ExecNode<?, ?> input = nodeList.get(inputIndex);
+ inputNodes.add(input);
+ }
+
when(nodeList.get(nodeIndex).getInputNodes()).thenReturn(inputNodes);
+ if (inputNodeIndexes.length == 1 && nodeList.get(nodeIndex)
instanceof SingleRel) {
+ when(((SingleRel)
nodeList.get(nodeIndex)).getInput()).thenReturn((RelNode)
nodeList.get(inputNodeIndexes[0]));
+ } else if (inputNodeIndexes.length == 2 &&
nodeList.get(nodeIndex) instanceof BiRel) {
+ when(((BiRel)
nodeList.get(nodeIndex)).getLeft()).thenReturn((RelNode)
nodeList.get(inputNodeIndexes[0]));
+ when(((BiRel)
nodeList.get(nodeIndex)).getRight()).thenReturn((RelNode)
nodeList.get(inputNodeIndexes[1]));
+ }
+ }
+}
+
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchFinalParallelismSetterTest.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchFinalParallelismSetterTest.java
new file mode 100644
index 0000000..9d54445
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchFinalParallelismSetterTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.batch.parallelism;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.BatchTableEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+import
org.apache.flink.table.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecExchange;
+import
org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecUnion;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecValues;
+import org.apache.flink.table.plan.nodes.resource.MockNodeTestBase;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for BatchFinalParallelismSetter.
+ */
+public class BatchFinalParallelismSetterTest extends MockNodeTestBase {
+
+ private StreamExecutionEnvironment sEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+ private BatchTableEnvironment tEnv;
+
+ @Before
+ public void setUp() {
+ tEnv = TableEnvironment.getBatchTableEnvironment(sEnv);
+ sEnv.setParallelism(21);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testSource() {
+ /**
+ * 0, Source 1, Source 2, Values 4, Source 5, Source
+ * \ / / / /
+ * 3, Union
+ */
+ createNodeList(6);
+ BatchExecTableSourceScan scan0 =
mock(BatchExecTableSourceScan.class);
+ updateNode(0, scan0);
+ when(((BatchExecTableSourceScan)
nodeList.get(0)).getSourceTransformation(any()).getMaxParallelism()).thenReturn(5);
+ updateNode(1, mock(BatchExecTableSourceScan.class));
+ updateNode(2, mock(BatchExecValues.class));
+ updateNode(3, mock(BatchExecUnion.class));
+ updateNode(4, mock(BatchExecBoundedStreamScan.class));
+ when(((BatchExecBoundedStreamScan)
nodeList.get(4)).getSourceTransformation().getParallelism()).thenReturn(7);
+ updateNode(5, mock(BatchExecBoundedStreamScan.class));
+ connect(3, 0, 1, 2, 4, 5);
+ Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap =
BatchFinalParallelismSetter.calculate(tEnv,
Collections.singletonList(nodeList.get(3)));
+ assertEquals(4, finalParallelismNodeMap.size());
+ assertEquals(5,
finalParallelismNodeMap.get(nodeList.get(0)).intValue());
+ assertEquals(1,
finalParallelismNodeMap.get(nodeList.get(2)).intValue());
+ assertEquals(7,
finalParallelismNodeMap.get(nodeList.get(4)).intValue());
+ assertEquals(21,
finalParallelismNodeMap.get(nodeList.get(5)).intValue());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testExchange() {
+ /**
+ * 0, Source 1, Source 10, Source
+ * | | |
+ * 2, Exchange 3, Exchange 8, Source 9, Exchange
+ * | | \ /
+ * 4, Calc 5, Calc 7, Join
+ * \ / /
+ * 6, Union
+ */
+ createNodeList(11);
+ BatchExecTableSourceScan scan0 =
mock(BatchExecTableSourceScan.class);
+ updateNode(0, scan0);
+ when(((BatchExecTableSourceScan)
nodeList.get(0)).getSourceTransformation(any()).getMaxParallelism()).thenReturn(5);
+ BatchExecExchange execExchange4 = mock(BatchExecExchange.class,
RETURNS_DEEP_STUBS);
+
when(execExchange4.getDistribution().getType()).thenReturn(RelDistribution.Type.BROADCAST_DISTRIBUTED);
+ updateNode(2, execExchange4);
+ BatchExecExchange execExchange3 = mock(BatchExecExchange.class,
RETURNS_DEEP_STUBS);
+ updateNode(3, execExchange3);
+
when(execExchange3.getDistribution().getType()).thenReturn(RelDistribution.Type.SINGLETON);
+ BatchExecExchange execExchange5 = mock(BatchExecExchange.class,
RETURNS_DEEP_STUBS);
+ updateNode(9, execExchange5);
+
when(execExchange5.getDistribution().getType()).thenReturn(RelDistribution.Type.SINGLETON);
+ connect(2, 0);
+ connect(4, 2);
+ connect(3, 1);
+ connect(5, 3);
+ connect(6, 4, 5, 7);
+ connect(7, 8, 9);
+ connect(9, 10);
+ Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap =
BatchFinalParallelismSetter.calculate(tEnv,
Collections.singletonList(nodeList.get(6)));
+ assertEquals(3, finalParallelismNodeMap.size());
+ assertEquals(5,
finalParallelismNodeMap.get(nodeList.get(0)).intValue());
+ assertEquals(1,
finalParallelismNodeMap.get(nodeList.get(5)).intValue());
+ assertEquals(1,
finalParallelismNodeMap.get(nodeList.get(7)).intValue());
+ }
+}
+
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchShuffleStageParallelismCalculatorTest.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchShuffleStageParallelismCalculatorTest.java
new file mode 100644
index 0000000..1ce39ca
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchShuffleStageParallelismCalculatorTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.batch.parallelism;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableConfigOptions;
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecCalc;
+import
org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link BatchShuffleStageParallelismCalculator}.
+ */
+public class BatchShuffleStageParallelismCalculatorTest {
+
+ private Configuration tableConf;
+ private BatchExecTableSourceScan tableSourceScan =
mock(BatchExecTableSourceScan.class);
+ private int envParallelism = 5;
+
+ @Before
+ public void setUp() {
+ tableConf = new Configuration();
+ tableConf.setString(TableConfigOptions.SQL_RESOURCE_INFER_MODE,
NodeResourceConfig.InferMode.ONLY_SOURCE.toString());
+
tableConf.setLong(TableConfigOptions.SQL_RESOURCE_INFER_ROWS_PER_PARTITION,
100);
+
tableConf.setInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 50);
+ when(tableSourceScan.getEstimatedRowCount()).thenReturn(3000d);
+ }
+
+ @Test
+ public void testOnlySource() {
+ ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
+
when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(tableSourceScan)));
+ new BatchShuffleStageParallelismCalculator(tableConf,
envParallelism).calculate(shuffleStage0);
+ verify(shuffleStage0).setParallelism(30, false);
+ }
+
+ @Test
+ public void testSourceAndCalc() {
+ ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
+ BatchExecCalc calc = mock(BatchExecCalc.class);
+
when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(tableSourceScan,
calc)));
+ new BatchShuffleStageParallelismCalculator(tableConf,
envParallelism).calculate(shuffleStage0);
+ verify(shuffleStage0).setParallelism(30, false);
+ }
+
+ @Test
+ public void testNoSource() {
+ ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
+ BatchExecCalc calc = mock(BatchExecCalc.class);
+
when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(calc)));
+ new BatchShuffleStageParallelismCalculator(tableConf,
envParallelism).calculate(shuffleStage0);
+ verify(shuffleStage0).setParallelism(50, false);
+ }
+
+ @Test
+ public void testEnvParallelism() {
+ tableConf.setString(TableConfigOptions.SQL_RESOURCE_INFER_MODE,
NodeResourceConfig.InferMode.NONE.toString());
+
tableConf.setInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, -1);
+ ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
+ BatchExecCalc calc = mock(BatchExecCalc.class);
+
when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(tableSourceScan,
calc)));
+ new BatchShuffleStageParallelismCalculator(tableConf,
envParallelism).calculate(shuffleStage0);
+ verify(shuffleStage0).setParallelism(5, false);
+ }
+
+ private Set<ExecNode<?, ?>> getNodeSet(List<ExecNode<?, ?>> nodeList) {
+ Set<ExecNode<?, ?>> nodeSet = new HashSet<>();
+ nodeSet.addAll(nodeList);
+ return nodeSet;
+ }
+}
+
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/ShuffleStageGeneratorTest.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/ShuffleStageGeneratorTest.java
new file mode 100644
index 0000000..48ce447
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/ShuffleStageGeneratorTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.batch.parallelism;
+
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecCalc;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecExchange;
+import
org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecUnion;
+import org.apache.flink.table.plan.nodes.resource.MockNodeTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test for {@link ShuffleStageGenerator}.
+ */
+public class ShuffleStageGeneratorTest extends MockNodeTestBase {
+
+ private Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap;
+
+ @Before
+ public void setUp() {
+ finalParallelismNodeMap = new HashMap<>();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testGenerateShuffleStags() {
+ /**
+ *
+ * 0, Source 1, Source
+ * \ /
+ * 2, Union
+ * / \
+ * 3, Calc 4, Calc
+ * | |
+ * 5, Exchange 6, Exchange
+ * \ /
+ * 7, Join
+ * |
+ * 8, Calc
+ */
+ createNodeList(9);
+ updateNode(2, mock(BatchExecUnion.class));
+ updateNode(5, mock(BatchExecExchange.class));
+ updateNode(6, mock(BatchExecExchange.class));
+ connect(2, 0, 1);
+ connect(3, 2);
+ connect(4, 2);
+ connect(5, 3);
+ connect(6, 4);
+ connect(7, 5, 6);
+ connect(8, 7);
+
+ Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap =
ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(8)),
finalParallelismNodeMap);
+
+ assertSameShuffleStage(nodeShuffleStageMap, 7, 8);
+ assertSameShuffleStage(nodeShuffleStageMap, 0, 1, 3, 4);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testMultiOutput() {
+ /**
+ *
+ * 0, Source 2, Source 4, Source 6, Source
+ * | | | |
+ * 1, Calc 3, Calc 5, Calc 7, Exchange
+ * \ / \ / \ /
+ * 8, Join 9, Join 10, Join
+ * \ / \ /
+ * \ 12, Exchange \ /
+ * \ / \ /
+ * 11, Join 13, Union
+ * \ |
+ * 15, Exchange 14, Calc
+ * \ /
+ * 16, Join
+ */
+ createNodeList(17);
+ updateNode(7, mock(BatchExecExchange.class));
+ updateNode(12, mock(BatchExecExchange.class));
+ updateNode(13, mock(BatchExecUnion.class));
+ updateNode(15, mock(BatchExecExchange.class));
+ connect(1, 0);
+ connect(3, 2);
+ connect(5, 4);
+ connect(7, 6);
+ connect(8, 1, 3);
+ connect(9, 3, 5);
+ connect(10, 5, 7);
+ connect(12, 9);
+ connect(11, 8, 12);
+ connect(13, 9, 10);
+ connect(14, 13);
+ connect(15, 11);
+ connect(16, 15, 14);
+
+ Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap =
ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(16)),
finalParallelismNodeMap);
+
+ assertSameShuffleStage(nodeShuffleStageMap, 0, 1, 8, 3, 2, 9,
5, 4, 10, 11, 14, 16);
+ assertSameShuffleStage(nodeShuffleStageMap, 6);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testWithFinalParallelism() {
+ /**
+ *
+ * 0, Source 2, Source
+ * | |
+ * 1, Calc 3, Calc 6, Source
+ * \ / /
+ * 4, Union
+ * |
+ * 5, Calc
+ */
+ createNodeList(7);
+ ExecNode<?, ?> scan0 = mock(BatchExecTableSourceScan.class);
+ ExecNode<?, ?>scan1 = mock(BatchExecTableSourceScan.class);
+ updateNode(0, scan0);
+ finalParallelismNodeMap.put(scan0, 10);
+ updateNode(2, scan1);
+ finalParallelismNodeMap.put(scan1, 11);
+ updateNode(4, mock(BatchExecUnion.class));
+ updateNode(5, mock(BatchExecCalc.class));
+ updateNode(6, mock(BatchExecTableSourceScan.class));
+ connect(1, 0);
+ connect(3, 2);
+ connect(4, 1, 3, 6);
+ connect(5, 4);
+
+ Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap =
ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(5)),
finalParallelismNodeMap);
+
+ assertSameShuffleStage(nodeShuffleStageMap, 0, 1);
+ assertSameShuffleStage(nodeShuffleStageMap, 2, 3, 6, 5);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testWithFinalParallelism1() {
+ /**
+ *
+ * 0, Source 2, Source
+ * | |
+ * 1, Calc 3, Calc
+ * \ /
+ * 4, Union
+ * |
+ * 5, Calc
+ */
+ createNodeList(7);
+ ExecNode<?, ?> scan0 = mock(BatchExecTableSourceScan.class);
+ ExecNode<?, ?> scan1 = mock(BatchExecTableSourceScan.class);
+ updateNode(0, scan0);
+ finalParallelismNodeMap.put(scan0, 10);
+ updateNode(2, scan1);
+ finalParallelismNodeMap.put(scan1, 11);
+ updateNode(4, mock(BatchExecUnion.class));
+ ExecNode<?, ?> calc = mock(BatchExecCalc.class);
+ updateNode(5, calc);
+ finalParallelismNodeMap.put(calc, 12);
+ connect(1, 0);
+ connect(3, 2);
+ connect(4, 1, 3);
+ connect(5, 4);
+
+ Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap =
ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(5)),
finalParallelismNodeMap);
+
+ assertSameShuffleStage(nodeShuffleStageMap, 0, 1);
+ assertSameShuffleStage(nodeShuffleStageMap, 2, 3);
+ assertSameShuffleStage(nodeShuffleStageMap, 5);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testWithFinalParallelism2() {
+ /**
+ *
+ * 0, Source 2, Source
+ * | |
+ * | 3, Exchange
+ * | |
+ * 1, Calc 4, Calc
+ * \ /
+ * 5, Union
+ * |
+ * 6, Calc
+ */
+ createNodeList(7);
+ ExecNode<?, ?> scan0 = mock(BatchExecTableSourceScan.class);
+ ExecNode<?, ?> scan1 = mock(BatchExecTableSourceScan.class);
+ updateNode(0, scan0);
+ finalParallelismNodeMap.put(scan0, 10);
+ updateNode(2, scan1);
+ finalParallelismNodeMap.put(scan1, 11);
+ updateNode(3, mock(BatchExecExchange.class));
+ ExecNode<?, ?> calc = mock(BatchExecCalc.class);
+ updateNode(4, calc);
+ finalParallelismNodeMap.put(calc, 1);
+ updateNode(5, mock(BatchExecUnion.class));
+ connect(1, 0);
+ connect(3, 2);
+ connect(4, 3);
+ connect(5, 1, 4);
+ connect(6, 5);
+
+ Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap =
ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(6)),
finalParallelismNodeMap);
+
+ assertSameShuffleStage(nodeShuffleStageMap, 0, 1, 6);
+ assertSameShuffleStage(nodeShuffleStageMap, 2);
+ assertSameShuffleStage(nodeShuffleStageMap, 4);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testWithFinalParallelism3() {
+ /**
+ *
+ * 0, Source 2, Source
+ * | |
+ * 1, Calc 3, Calc 6, Source 7,Source
+ * \ / / /
+ * 4, Union
+ * |
+ * 5, Calc
+ */
+ createNodeList(8);
+ ExecNode<?, ?> scan0 = mock(BatchExecTableSourceScan.class);
+ ExecNode<?, ?> scan1 = mock(BatchExecTableSourceScan.class);
+ updateNode(0, scan0);
+ finalParallelismNodeMap.put(scan0, 11);
+ updateNode(2, scan1);
+ finalParallelismNodeMap.put(scan1, 5);
+ ExecNode<?, ?> union4 = mock(BatchExecUnion.class);
+ updateNode(4, union4);
+ finalParallelismNodeMap.put(union4, 5);
+ updateNode(5, mock(BatchExecCalc.class));
+ updateNode(6, mock(BatchExecTableSourceScan.class));
+ updateNode(7, mock(BatchExecTableSourceScan.class));
+ connect(1, 0);
+ connect(3, 2);
+ connect(4, 1, 3, 6, 7);
+ connect(5, 4);
+
+ Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap =
ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(5)),
finalParallelismNodeMap);
+
+ assertSameShuffleStage(nodeShuffleStageMap, 0, 1);
+ assertSameShuffleStage(nodeShuffleStageMap, 2, 3, 6, 5, 7);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testWithFinalParallelism4() {
+ /**
+ *
+ * 0, Source 2, Source
+ * | |
+ * 1, Calc 3, Calc
+ * \ /
+ * 4, Union
+ * |
+ * 5, Calc
+ */
+ createNodeList(6);
+ ExecNode<?, ?> scan0 = mock(BatchExecTableSourceScan.class);
+ ExecNode<?, ?> scan1 = mock(BatchExecTableSourceScan.class);
+ updateNode(0, scan0);
+ finalParallelismNodeMap.put(scan0, 11);
+ updateNode(2, scan1);
+ finalParallelismNodeMap.put(scan1, 5);
+ ExecNode<?, ?> union4 = mock(BatchExecUnion.class);
+ updateNode(4, union4);
+ finalParallelismNodeMap.put(union4, 3);
+ updateNode(5, mock(BatchExecCalc.class));
+ connect(1, 0);
+ connect(3, 2);
+ connect(4, 1, 3);
+ connect(5, 4);
+
+ Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap =
ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(5)),
finalParallelismNodeMap);
+
+ assertSameShuffleStage(nodeShuffleStageMap, 0, 1);
+ assertSameShuffleStage(nodeShuffleStageMap, 2, 3);
+ assertSameShuffleStage(nodeShuffleStageMap, 5);
+ }
+
+ private void assertSameShuffleStage(Map<ExecNode<?, ?>, ShuffleStage>
nodeShuffleStageMap, int ... nodeIndexes) {
+ Set<ExecNode<?, ?>> nodeSet = new HashSet<>();
+ for (int index : nodeIndexes) {
+ nodeSet.add(nodeList.get(index));
+ }
+ for (int index : nodeIndexes) {
+ assertNotNull("shuffleStage should not be null. node
index: " + index, nodeShuffleStageMap.get(nodeList.get(index)));
+ assertEquals("node index: " + index, nodeSet,
nodeShuffleStageMap.get(nodeList.get(index)).getExecNodeSet());
+ }
+ }
+}
+
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/nodes/resource/BatchExecResourceTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/nodes/resource/BatchExecResourceTest.xml
new file mode 100644
index 0000000..a0ad41b
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/nodes/resource/BatchExecResourceTest.xml
@@ -0,0 +1,185 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testConfigSourceParallelism[NONE]">
+ <Resource name="sql">
+ <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c
limit 2]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[sum_a, c], resource=[{parallelism=1}])
++- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[true],
resource=[{parallelism=1}])
+ +- Exchange(distribution=[single], resource=[{parallelism=-1}])
+ +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[false],
resource=[{parallelism=18}])
+ +- HashAggregate(isMerge=[true], groupBy=[c], select=[c,
Final_SUM(sum$0) AS sum_a], resource=[{parallelism=18}])
+ +- Exchange(distribution=[hash[c]], resource=[{parallelism=-1}])
+ +- LocalHashAggregate(groupBy=[c], select=[c, Partial_SUM(a) AS
sum$0], resource=[{parallelism=100}])
+ +- Calc(select=[c, a], resource=[{parallelism=100}])
+ +- TableSourceScan(table=[[table3, source:
[TestTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=100}])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConfigSourceParallelism[ONLY_SOURCE]">
+ <Resource name="sql">
+ <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c
limit 2]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[sum_a, c], resource=[{parallelism=1}])
++- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[true],
resource=[{parallelism=1}])
+ +- Exchange(distribution=[single], resource=[{parallelism=-1}])
+ +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[false],
resource=[{parallelism=18}])
+ +- HashAggregate(isMerge=[true], groupBy=[c], select=[c,
Final_SUM(sum$0) AS sum_a], resource=[{parallelism=18}])
+ +- Exchange(distribution=[hash[c]], resource=[{parallelism=-1}])
+ +- LocalHashAggregate(groupBy=[c], select=[c, Partial_SUM(a) AS
sum$0], resource=[{parallelism=5}])
+ +- Calc(select=[c, a], resource=[{parallelism=5}])
+ +- TableSourceScan(table=[[table3, source:
[TestTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=5}])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRangePartition[NONE]">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM Table5 where d < 100 order by e]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Sort(orderBy=[e ASC], resource=[{parallelism=18}])
++- Exchange(distribution=[range[e ASC]], resource=[{parallelism=18}])
+ +- Calc(select=[d, e, f, g, h], where=[<(d, 100)],
resource=[{parallelism=18}])
+ +- TableSourceScan(table=[[Table5, source: [TestTableSource(d, e, f, g,
h)]]], fields=[d, e, f, g, h], resource=[{parallelism=18}])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRangePartition[ONLY_SOURCE]">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM Table5 where d < 100 order by e]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Sort(orderBy=[e ASC], resource=[{parallelism=8}])
++- Exchange(distribution=[range[e ASC]], resource=[{parallelism=8}])
+ +- Calc(select=[d, e, f, g, h], where=[<(d, 100)],
resource=[{parallelism=8}])
+ +- TableSourceScan(table=[[Table5, source: [TestTableSource(d, e, f, g,
h)]]], fields=[d, e, f, g, h], resource=[{parallelism=8}])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSourcePartitionMaxNum[NONE]">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM table3]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+TableSourceScan(table=[[table3, source: [TestTableSource(a, b, c)]]],
fields=[a, b, c], resource=[{parallelism=18}])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSortLimit[NONE]">
+ <Resource name="sql">
+ <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c
limit 2]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[sum_a, c], resource=[{parallelism=1}])
++- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[true],
resource=[{parallelism=1}])
+ +- Exchange(distribution=[single], resource=[{parallelism=-1}])
+ +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[false],
resource=[{parallelism=18}])
+ +- HashAggregate(isMerge=[true], groupBy=[c], select=[c,
Final_SUM(sum$0) AS sum_a], resource=[{parallelism=18}])
+ +- Exchange(distribution=[hash[c]], resource=[{parallelism=-1}])
+ +- LocalHashAggregate(groupBy=[c], select=[c, Partial_SUM(a) AS
sum$0], resource=[{parallelism=18}])
+ +- Calc(select=[c, a], resource=[{parallelism=18}])
+ +- TableSourceScan(table=[[table3, source:
[TestTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSortLimit[ONLY_SOURCE]">
+ <Resource name="sql">
+ <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c
limit 2]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[sum_a, c], resource=[{parallelism=1}])
++- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[true],
resource=[{parallelism=1}])
+ +- Exchange(distribution=[single], resource=[{parallelism=-1}])
+ +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[false],
resource=[{parallelism=18}])
+ +- HashAggregate(isMerge=[true], groupBy=[c], select=[c,
Final_SUM(sum$0) AS sum_a], resource=[{parallelism=18}])
+ +- Exchange(distribution=[hash[c]], resource=[{parallelism=-1}])
+ +- LocalHashAggregate(groupBy=[c], select=[c, Partial_SUM(a) AS
sum$0], resource=[{parallelism=5}])
+ +- Calc(select=[c, a], resource=[{parallelism=5}])
+ +- TableSourceScan(table=[[table3, source:
[TestTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=5}])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testUnionQuery[NONE]">
+ <Resource name="sql">
+ <![CDATA[SELECT sum(a) as sum_a, g FROM (SELECT a, b, c FROM table3
UNION ALL SELECT a, b, c FROM table4), Table5 WHERE b = e group by g]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[sum_a, g], resource=[{parallelism=18}])
++- HashAggregate(isMerge=[true], groupBy=[g], select=[g, Final_SUM(sum$0) AS
sum_a], resource=[{parallelism=18}])
+ +- Exchange(distribution=[hash[g]], resource=[{parallelism=-1}])
+ +- LocalHashAggregate(groupBy=[g], select=[g, Partial_SUM(a) AS sum$0],
resource=[{parallelism=18}])
+ +- Calc(select=[g, a], resource=[{parallelism=18}])
+ +- HashJoin(joinType=[InnerJoin], where=[=(b, e)], select=[a, b,
e, g], build=[left], resource=[{parallelism=18}])
+ :- Exchange(distribution=[hash[b]], resource=[{parallelism=-1}])
+ : +- Union(all=[true], union=[a, b],
resource=[{parallelism=-1}])
+ : :- Calc(select=[a, b], resource=[{parallelism=18}])
+ : : +- TableSourceScan(table=[[table3, source:
[TestTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
+ : +- Calc(select=[a, b], resource=[{parallelism=18}])
+ : +- TableSourceScan(table=[[table4, source:
[TestTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
+ +- Exchange(distribution=[hash[e]], resource=[{parallelism=-1}])
+ +- Calc(select=[e, g], resource=[{parallelism=18}])
+ +- TableSourceScan(table=[[Table5, source:
[TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h],
resource=[{parallelism=18}])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSourcePartitionMaxNum[ONLY_SOURCE]">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM table3]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+TableSourceScan(table=[[table3, source: [TestTableSource(a, b, c)]]],
fields=[a, b, c], resource=[{parallelism=2}])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testUnionQuery[ONLY_SOURCE]">
+ <Resource name="sql">
+ <![CDATA[SELECT sum(a) as sum_a, g FROM (SELECT a, b, c FROM table3
UNION ALL SELECT a, b, c FROM table4), Table5 WHERE b = e group by g]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[sum_a, g], resource=[{parallelism=18}])
++- HashAggregate(isMerge=[true], groupBy=[g], select=[g, Final_SUM(sum$0) AS
sum_a], resource=[{parallelism=18}])
+ +- Exchange(distribution=[hash[g]], resource=[{parallelism=-1}])
+ +- LocalHashAggregate(groupBy=[g], select=[g, Partial_SUM(a) AS sum$0],
resource=[{parallelism=18}])
+ +- Calc(select=[g, a], resource=[{parallelism=18}])
+ +- HashJoin(joinType=[InnerJoin], where=[=(b, e)], select=[a, b,
e, g], build=[left], resource=[{parallelism=18}])
+ :- Exchange(distribution=[hash[b]], resource=[{parallelism=-1}])
+ : +- Union(all=[true], union=[a, b],
resource=[{parallelism=-1}])
+ : :- Calc(select=[a, b], resource=[{parallelism=5}])
+ : : +- TableSourceScan(table=[[table3, source:
[TestTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=5}])
+ : +- Calc(select=[a, b], resource=[{parallelism=5}])
+ : +- TableSourceScan(table=[[table4, source:
[TestTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=5}])
+ +- Exchange(distribution=[hash[e]], resource=[{parallelism=-1}])
+ +- Calc(select=[e, g], resource=[{parallelism=8}])
+ +- TableSourceScan(table=[[Table5, source:
[TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h],
resource=[{parallelism=8}])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/BatchExecResourceTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/BatchExecResourceTest.scala
new file mode 100644
index 0000000..8c55822
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/BatchExecResourceTest.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableConfig, TableConfigOptions, Types}
+import
org.apache.flink.table.plan.nodes.resource.batch.parallelism.NodeResourceConfig
+import org.apache.flink.table.plan.stats.{FlinkStatistic, TableStats}
+import org.apache.flink.table.util.TableTestBase
+
+import org.junit.{Before, Test}
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import java.util.{Arrays => JArrays, Collection => JCollection}
+
+@RunWith(classOf[Parameterized])
+class BatchExecResourceTest(inferMode: String) extends TableTestBase {
+
+ private val util = batchTestUtil()
+
+ @Before
+ def before(): Unit = {
+ util.getTableEnv.getConfig.getConf.setString(
+ TableConfigOptions.SQL_RESOURCE_INFER_MODE,
+ inferMode
+ )
+ val table3Stats = new TableStats(5000000)
+ util.addTableSource("table3",
+ Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING),
+ Array("a", "b", "c"),
FlinkStatistic.builder().tableStats(table3Stats).build())
+ val table5Stats = new TableStats(8000000)
+ util.addTableSource("Table5",
+ Array[TypeInformation[_]](Types.INT, Types.LONG, Types.INT,
Types.STRING, Types.LONG),
+ Array("d", "e", "f", "g", "h"),
FlinkStatistic.builder().tableStats(table5Stats).build())
+ BatchExecResourceTest.setResourceConfig(util.getTableEnv.getConfig)
+ }
+
+ @Test
+ def testSourcePartitionMaxNum(): Unit = {
+ util.getTableEnv.getConfig.getConf.setInteger(
+ TableConfigOptions.SQL_RESOURCE_INFER_SOURCE_PARALLELISM_MAX,
+ 2
+ )
+ val sqlQuery = "SELECT * FROM table3"
+ util.verifyResource(sqlQuery)
+ }
+
+ @Test
+ def testSortLimit(): Unit = {
+ val sqlQuery = "SELECT sum(a) as sum_a, c FROM table3 group by c order by
c limit 2"
+ util.verifyResource(sqlQuery)
+ }
+
+ @Test
+ def testConfigSourceParallelism(): Unit = {
+ util.getTableEnv.getConfig.getConf.setInteger(
+ TableConfigOptions.SQL_RESOURCE_SOURCE_PARALLELISM, 100)
+ val sqlQuery = "SELECT sum(a) as sum_a, c FROM table3 group by c order by
c limit 2"
+ util.verifyResource(sqlQuery)
+ }
+
+ @Test
+ // TODO check when range partition added.
+ def testRangePartition(): Unit ={
+ util.getTableEnv.getConfig.getConf.setBoolean(
+ TableConfigOptions.SQL_EXEC_SORT_RANGE_ENABLED,
+ true)
+ val sqlQuery = "SELECT * FROM Table5 where d < 100 order by e"
+ util.verifyResource(sqlQuery)
+ }
+
+ @Test
+ def testUnionQuery(): Unit = {
+ val statsOfTable4 = new TableStats(100L)
+ util.addTableSource("table4",
+ Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING),
+ Array("a", "b", "c"),
+ FlinkStatistic.builder().tableStats(statsOfTable4).build())
+
+ val sqlQuery = "SELECT sum(a) as sum_a, g FROM " +
+ "(SELECT a, b, c FROM table3 UNION ALL SELECT a, b, c FROM table4),
Table5 " +
+ "WHERE b = e group by g"
+ util.verifyResource(sqlQuery)
+ }
+}
+
+object BatchExecResourceTest {
+
+ @Parameterized.Parameters(name = "{0}")
+ def parameters(): JCollection[String] = JArrays.asList(
+ NodeResourceConfig.InferMode.NONE.toString,
+ NodeResourceConfig.InferMode.ONLY_SOURCE.toString)
+
+ def setResourceConfig(tableConfig: TableConfig): Unit = {
+ tableConfig.getConf.setInteger(
+ TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM,
+ 18)
+ tableConfig.getConf.setInteger(
+ TableConfigOptions.SQL_RESOURCE_INFER_SOURCE_PARALLELISM_MAX,
+ 1000)
+ tableConfig.getConf.setLong(
+ TableConfigOptions.SQL_RESOURCE_INFER_ROWS_PER_PARTITION,
+ 1000000
+ )
+ }
+}
+
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
index 1e6c3cf..e32fce4 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
@@ -51,7 +51,9 @@ class TableScanITCase extends BatchTestBase {
row("Mary", new JLong(1L), new JInt(1)),
row("Bob", new JLong(2L), new JInt(3))
)
- execEnv.fromCollection(data).returns(getReturnType)
+ val dataStream = execEnv.fromCollection(data).returns(getReturnType)
+ dataStream.getTransformation.setMaxParallelism(1)
+ dataStream
}
override def getReturnType: TypeInformation[Row] = new
RowTypeInfo(fieldTypes, fieldNames)
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index fda5a3c..25fdc74 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -305,6 +305,18 @@ abstract class TableTestUtil(test: TableTestBase) {
assertEqualsOrExpand("planAfter", actual.toString, expand = false)
}
+ def verifyResource(sql: String): Unit = {
+ assertEqualsOrExpand("sql", sql)
+ val table = getTableEnv.sqlQuery(sql)
+ doVerifyPlan(
+ table,
+ explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+ withRowType = false,
+ withRetractTraits = false,
+ printResource = true,
+ printPlanBefore = false)
+ }
+
def doVerifyPlan(
table: Table,
explainLevel: SqlExplainLevel,
@@ -323,13 +335,15 @@ abstract class TableTestUtil(test: TableTestBase) {
explainLevel: SqlExplainLevel,
withRowType: Boolean,
withRetractTraits: Boolean,
- printPlanBefore: Boolean): Unit = {
+ printPlanBefore: Boolean,
+ printResource: Boolean = false): Unit = {
val relNode = table.asInstanceOf[TableImpl].getRelNode
val optimizedPlan = getOptimizedPlan(
Array(relNode),
explainLevel,
withRetractTraits = withRetractTraits,
- withRowType = withRowType)
+ withRowType = withRowType,
+ withResource = printResource)
if (printPlanBefore) {
val planBefore = SystemUtils.LINE_SEPARATOR +
@@ -393,7 +407,8 @@ abstract class TableTestUtil(test: TableTestBase) {
relNodes: Array[RelNode],
explainLevel: SqlExplainLevel,
withRetractTraits: Boolean,
- withRowType: Boolean): String = {
+ withRowType: Boolean,
+ withResource: Boolean = false): String = {
require(relNodes.nonEmpty)
val tEnv = getTableEnv
val optimizedRels = tEnv.optimize(relNodes)
@@ -405,7 +420,8 @@ abstract class TableTestUtil(test: TableTestBase) {
optimizedNodes,
detailLevel = explainLevel,
withRetractTraits = withRetractTraits,
- withOutputType = withRowType)
+ withOutputType = withRowType,
+ withResource = withResource)
case _ =>
optimizedRels.map { rel =>
FlinkRelOptUtil.toString(
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
index c40f457..a6f744a 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
@@ -50,7 +50,9 @@ class TestTableSourceWithTime[T](
}
override def getBoundedStream(streamEnv: StreamExecutionEnvironment):
DataStreamSource[T] = {
- streamEnv.fromCollection(values, returnType)
+ val dataStream = streamEnv.fromCollection(values, returnType)
+ dataStream.getTransformation.setMaxParallelism(1)
+ dataStream
}
override def getRowtimeAttributeDescriptors:
util.List[RowtimeAttributeDescriptor] = {
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
index a0c3ec2..a693637 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
@@ -105,10 +105,17 @@ public class TableConfigOptions {
// Resource Options
//
------------------------------------------------------------------------
- /**
- * How many Bytes per MB.
- */
- public static final long SIZE_IN_MB = 1024L * 1024;
+ public static final ConfigOption<String> SQL_RESOURCE_INFER_MODE =
+ key("sql.resource.infer.mode")
+ .defaultValue("NONE")
+ .withDescription("Sets infer resource
mode according to statics. Only NONE, or ONLY_SOURCE can be set.\n" +
+ "If set NONE,
parallelism and memory of all node are set by config.\n" +
+ "If set ONLY_SOURCE,
only source parallelism is inferred according to statics.\n");
+
+ public static final ConfigOption<Integer>
SQL_RESOURCE_INFER_SOURCE_PARALLELISM_MAX =
+ key("sql.resource.infer.source.parallelism.max")
+ .defaultValue(1000)
+ .withDescription("Sets max infer
parallelism for source operator.");
public static final ConfigOption<Integer>
SQL_RESOURCE_DEFAULT_PARALLELISM =
key("sql.resource.default.parallelism")
@@ -116,6 +123,18 @@ public class TableConfigOptions {
.withDescription("Default parallelism
of the job. If any node do not have special parallelism, use it." +
"Its default value is
the num of cpu cores in the client host.");
+ public static final ConfigOption<Integer>
SQL_RESOURCE_SOURCE_PARALLELISM =
+ key("sql.resource.source.parallelism")
+ .defaultValue(-1)
+ .withDescription("Sets source
parallelism if " + SQL_RESOURCE_INFER_MODE + " is NONE, " +
+ "use " +
SQL_RESOURCE_DEFAULT_PARALLELISM + " to set source parallelism.");
+
+ public static final ConfigOption<Integer> SQL_RESOURCE_SINK_PARALLELISM
=
+ key("sql.resource.sink.parallelism")
+ .defaultValue(-1)
+ .withDescription("Sets sink parallelism
if it is set. If it is not set, " +
+ "sink nodes will chain
with ahead nodes as far as possible.");
+
public static final ConfigOption<Integer>
SQL_RESOURCE_EXTERNAL_BUFFER_MEM =
key("sql.resource.external-buffer.memory.mb")
.defaultValue(10)
@@ -225,10 +244,4 @@ public class TableConfigOptions {
.defaultValue(1000000L)
.withDescription("Sets how many rows
one partition processes. We will infer parallelism according " +
"to input row count.");
-
- public static final ConfigOption<Integer>
SQL_RESOURCE_INFER_OPERATOR_PARALLELISM_MAX =
- key("sql.resource.infer.operator.parallelism.max")
- .defaultValue(800)
- .withDescription("Sets max parallelism
for all operators.");
-
}