This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f5425f  [FLINK-12801][table-planner-blink] Set parallelism for batch 
SQL
6f5425f is described below

commit 6f5425fc9798510fef33ccd7bb81d4b9f59bffa5
Author: 姬平 <[email protected]>
AuthorDate: Tue Jun 11 18:22:32 2019 +0800

    [FLINK-12801][table-planner-blink] Set parallelism for batch SQL
    
    This closes #8690
---
 .../flink/table/api/PlannerConfigOptions.java      |   6 +
 .../table/plan/nodes/exec/NodeResourceConfig.java  |  81 -----
 .../plan/nodes/process/DAGProcessContext.java      |  42 +++
 .../table/plan/nodes/process/DAGProcessor.java     |  34 +++
 .../table/plan/nodes/resource/NodeResource.java    |  44 +++
 .../parallelism/BatchFinalParallelismSetter.java   | 112 +++++++
 .../parallelism/BatchParallelismProcessor.java     |  68 +++++
 .../BatchShuffleStageParallelismCalculator.java    |  93 ++++++
 .../batch/parallelism/NodeResourceConfig.java      | 108 +++++++
 .../resource/batch/parallelism/ShuffleStage.java   |  78 +++++
 .../batch/parallelism/ShuffleStageGenerator.java   | 148 ++++++++++
 .../flink/table/api/BatchTableEnvironment.scala    |  10 +-
 .../flink/table/api/StreamTableEnvironment.scala   |   2 +-
 .../apache/flink/table/api/TableEnvironment.scala  |   6 +-
 .../table/plan/metadata/FlinkRelMdRowCount.scala   |   5 +-
 .../flink/table/plan/nodes/exec/ExecNode.scala     |  12 +-
 .../batch/BatchExecBoundedStreamScan.scala         |   8 +-
 .../plan/nodes/physical/batch/BatchExecCalc.scala  |   2 +-
 .../nodes/physical/batch/BatchExecCorrelate.scala  |   2 +-
 .../nodes/physical/batch/BatchExecExpand.scala     |   2 +-
 .../physical/batch/BatchExecHashAggregate.scala    |   8 -
 .../batch/BatchExecHashAggregateBase.scala         |  11 +-
 .../batch/BatchExecHashWindowAggregateBase.scala   |   7 +-
 .../plan/nodes/physical/batch/BatchExecLimit.scala |   2 +-
 .../batch/BatchExecLocalHashAggregate.scala        |   3 -
 .../batch/BatchExecLocalSortAggregate.scala        |   3 -
 .../batch/BatchExecLocalSortWindowAggregate.scala  |   6 -
 .../nodes/physical/batch/BatchExecLookupJoin.scala |   4 +-
 .../physical/batch/BatchExecOverAggregate.scala    |   5 +-
 .../plan/nodes/physical/batch/BatchExecRank.scala  |   2 +-
 .../plan/nodes/physical/batch/BatchExecSink.scala  |  18 +-
 .../plan/nodes/physical/batch/BatchExecSort.scala  |   9 +-
 .../physical/batch/BatchExecSortAggregate.scala    |   4 -
 .../batch/BatchExecSortAggregateBase.scala         |   8 +-
 .../nodes/physical/batch/BatchExecSortLimit.scala  |   2 +-
 .../physical/batch/BatchExecSortMergeJoin.scala    |   7 +-
 .../batch/BatchExecSortWindowAggregate.scala       |   4 -
 .../batch/BatchExecSortWindowAggregateBase.scala   |   4 +-
 .../physical/batch/BatchExecTableSourceScan.scala  |  24 +-
 .../nodes/physical/batch/BatchExecValues.scala     |   5 +-
 .../plan/reuse/DeadlockBreakupProcessor.scala      |   6 +-
 .../flink/table/plan/util/ExecNodePlanDumper.scala |  28 +-
 .../plan/nodes/resource/MockNodeTestBase.java      |  92 ++++++
 .../BatchFinalParallelismSetterTest.java           | 125 ++++++++
 ...BatchShuffleStageParallelismCalculatorTest.java | 100 +++++++
 .../parallelism/ShuffleStageGeneratorTest.java     | 326 +++++++++++++++++++++
 .../plan/nodes/resource/BatchExecResourceTest.xml  | 185 ++++++++++++
 .../nodes/resource/BatchExecResourceTest.scala     | 124 ++++++++
 .../table/runtime/batch/sql/TableScanITCase.scala  |   4 +-
 .../apache/flink/table/util/TableTestBase.scala    |  24 +-
 .../apache/flink/table/util/testTableSources.scala |   4 +-
 .../apache/flink/table/api/TableConfigOptions.java |  33 ++-
 52 files changed, 1864 insertions(+), 186 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
index c2bdf85..9441ede 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
@@ -126,4 +126,10 @@ public class PlannerConfigOptions {
                                        .defaultValue(false)
                                        .withDescription("Disable union all as 
breakpoint when constructing RelNodeBlock");
 
+       public static final ConfigOption<Long> SQL_OPTIMIZER_ROWS_PER_LOCALAGG =
+                       key("sql.optimizer.rows-per-localAgg")
+                                       .defaultValue(1000000L)
+                                       .withDescription("Sets how many rows 
one localAgg processes. We will infer agg degree to decide whether " +
+                                                       "to use localAgg 
according to it.");
+
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/exec/NodeResourceConfig.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/exec/NodeResourceConfig.java
deleted file mode 100644
index f5f2107..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/exec/NodeResourceConfig.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.nodes.exec;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.TableConfigOptions;
-
-import static org.apache.flink.configuration.ConfigOptions.key;
-
-/**
- * Deal with resource config for {@link 
org.apache.flink.table.plan.nodes.exec.ExecNode}.
- */
-public class NodeResourceConfig {
-
-       public static final ConfigOption<Integer> 
SQL_RESOURCE_INFER_OPERATOR_PARALLELISM_MIN =
-                       key("sql.resource.infer.operator.parallelism.min")
-                                       .defaultValue(1)
-                                       .withDescription("Sets min parallelism 
for operators.");
-
-       /**
-        * Gets the config max num of operator parallelism.
-        *
-        * @param tableConf Configuration.
-        * @return the config max num of operator parallelism.
-        */
-       public static int getOperatorMaxParallelism(Configuration tableConf) {
-               return 
tableConf.getInteger(TableConfigOptions.SQL_RESOURCE_INFER_OPERATOR_PARALLELISM_MAX);
-       }
-
-       /**
-        * Gets the config min num of operator parallelism.
-        *
-        * @param tableConf Configuration.
-        * @return the config max num of operator parallelism.
-        */
-       public static int getOperatorMinParallelism(Configuration tableConf) {
-               return 
tableConf.getInteger(SQL_RESOURCE_INFER_OPERATOR_PARALLELISM_MIN);
-       }
-
-       /**
-        * Gets the config row count that one partition processes.
-        *
-        * @param tableConf Configuration.
-        * @return the config row count that one partition processes.
-        */
-       public static long getRowCountPerPartition(Configuration tableConf) {
-               return 
tableConf.getLong(TableConfigOptions.SQL_RESOURCE_INFER_ROWS_PER_PARTITION);
-       }
-
-       /**
-        * Calculates operator parallelism based on rowcount of the operator.
-        *
-        * @param rowCount rowCount of the operator
-        * @param tableConf Configuration.
-        * @return the result of operator parallelism.
-        */
-       public static int calOperatorParallelism(double rowCount, Configuration 
tableConf) {
-               int maxParallelism = getOperatorMaxParallelism(tableConf);
-               int minParallelism = getOperatorMinParallelism(tableConf);
-               int resultParallelism = (int) (rowCount / 
getRowCountPerPartition(tableConf));
-               return Math.max(Math.min(resultParallelism, maxParallelism), 
minParallelism);
-       }
-
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/process/DAGProcessContext.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/process/DAGProcessContext.java
new file mode 100644
index 0000000..3d825e0
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/process/DAGProcessContext.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.process;
+
+import org.apache.flink.table.api.TableEnvironment;
+
+/**
+ * Context for processors to process dag.
+ */
+public class DAGProcessContext {
+
+       private final TableEnvironment tableEnvironment;
+
+       public DAGProcessContext(TableEnvironment tableEnvironment) {
+               this.tableEnvironment = tableEnvironment;
+       }
+
+       /**
+        * Gets {@link TableEnvironment}, {@link 
org.apache.flink.table.api.BatchTableEnvironment} for batch job.
+        * and {@link org.apache.flink.table.api.StreamTableEnvironment} for 
stream job.
+        */
+       public TableEnvironment getTableEnvironment() {
+               return tableEnvironment;
+       }
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/process/DAGProcessor.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/process/DAGProcessor.java
new file mode 100644
index 0000000..fd7af93
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/process/DAGProcessor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.process;
+
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+
+import java.util.List;
+
+/**
+ * DAGProcess plugin, use it can set resource of dag or change other node info.
+ */
+public interface DAGProcessor {
+
+       /**
+        * Given a dag, process it and return the result dag.
+        */
+       List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, 
DAGProcessContext context);
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResource.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResource.java
new file mode 100644
index 0000000..91db90a
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/NodeResource.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource;
+
+/**
+ * Resource for node: parallelism. And other resource latitudes needed to add.
+ */
+public class NodeResource {
+
+       // node parallelism
+       private int parallelism = -1;
+
+       public int getParallelism() {
+               return parallelism;
+       }
+
+       public void setParallelism(int parallelism) {
+               this.parallelism = parallelism;
+       }
+
+       @Override
+       public String toString() {
+               StringBuilder sb = new StringBuilder("{");
+               sb.append("parallelism=").append(parallelism);
+               sb.append("}");
+               return sb.toString();
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchFinalParallelismSetter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchFinalParallelismSetter.java
new file mode 100644
index 0000000..3c574e5
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchFinalParallelismSetter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.batch.parallelism;
+
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+import 
org.apache.flink.table.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecExchange;
+import 
org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecValues;
+
+import org.apache.calcite.rel.RelDistribution;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Set final parallelism if needed at the beginning time, if parallelism of a 
node is set to be final,
+ * it will not be changed by other parallelism calculator.
+ */
+public class BatchFinalParallelismSetter {
+
+       private final TableEnvironment tEnv;
+       private Set<ExecNode<?, ?>> calculatedNodeSet = new HashSet<>();
+       private Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap = new 
HashMap<>();
+
+       private BatchFinalParallelismSetter(TableEnvironment tEnv) {
+               this.tEnv = tEnv;
+       }
+
+       /**
+        * Finding nodes that need to set final parallelism.
+        */
+       public static Map<ExecNode<?, ?>, Integer> calculate(TableEnvironment 
tEnv, List<ExecNode<?, ?>> sinkNodes) {
+               BatchFinalParallelismSetter setter = new 
BatchFinalParallelismSetter(tEnv);
+               sinkNodes.forEach(setter::calculate);
+               return setter.finalParallelismNodeMap;
+       }
+
+       private void calculate(ExecNode<?, ?> batchExecNode) {
+               if (!calculatedNodeSet.add(batchExecNode)) {
+                       return;
+               }
+               if (batchExecNode instanceof BatchExecTableSourceScan) {
+                       calculateTableSource((BatchExecTableSourceScan) 
batchExecNode);
+               } else if (batchExecNode instanceof BatchExecBoundedStreamScan) 
{
+                       calculateBoundedStreamScan((BatchExecBoundedStreamScan) 
batchExecNode);
+               } else if (batchExecNode instanceof BatchExecValues) {
+                       calculateValues((BatchExecValues) batchExecNode);
+               } else {
+                       calculateSingleton(batchExecNode);
+               }
+       }
+
+       private void calculateTableSource(BatchExecTableSourceScan 
tableSourceScan) {
+               StreamTransformation transformation = 
tableSourceScan.getSourceTransformation(tEnv.streamEnv());
+               if (transformation.getMaxParallelism() > 0) {
+                       finalParallelismNodeMap.put(tableSourceScan, 
transformation.getMaxParallelism());
+               }
+       }
+
+       private void calculateBoundedStreamScan(BatchExecBoundedStreamScan 
boundedStreamScan) {
+               StreamTransformation transformation = 
boundedStreamScan.getSourceTransformation();
+               int parallelism = transformation.getParallelism();
+               if (parallelism <= 0) {
+                       parallelism = tEnv.streamEnv().getParallelism();
+               }
+               finalParallelismNodeMap.put(boundedStreamScan, parallelism);
+       }
+
+       private void calculateSingleton(ExecNode<?, ?> execNode) {
+               calculateInputs(execNode);
+               for (ExecNode<?, ?> inputNode : execNode.getInputNodes()) {
+                       if (inputNode instanceof BatchExecExchange) {
+                               // set parallelism as 1 to GlobalAggregate and 
other global node.
+                               if (((BatchExecExchange) 
inputNode).getDistribution().getType() == RelDistribution.Type.SINGLETON) {
+                                       finalParallelismNodeMap.put(execNode, 
1);
+                                       return;
+                               }
+                       }
+               }
+       }
+
+       private void calculateValues(BatchExecValues valuesBatchExec) {
+               finalParallelismNodeMap.put(valuesBatchExec, 1);
+       }
+
+       private void calculateInputs(ExecNode<?, ?> node) {
+               node.getInputNodes().forEach(this::calculate);
+       }
+}
+
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchParallelismProcessor.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchParallelismProcessor.java
new file mode 100644
index 0000000..64b7c28
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchParallelismProcessor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.batch.parallelism;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.plan.nodes.exec.BatchExecNode;
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecSink;
+import org.apache.flink.table.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Processor for calculating parallelism for {@link BatchExecNode}.
+ */
+public class BatchParallelismProcessor implements DAGProcessor {
+
+       @Override
+       public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes, 
DAGProcessContext context) {
+               sinkNodes.forEach(s -> Preconditions.checkArgument(s instanceof 
BatchExecNode));
+               TableEnvironment tEnv = context.getTableEnvironment();
+               List<ExecNode<?, ?>> rootNodes = filterSinkNodes(sinkNodes);
+               Map<ExecNode<?, ?>, Integer> nodeToFinalParallelismMap = 
BatchFinalParallelismSetter.calculate(tEnv, rootNodes);
+               Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = 
ShuffleStageGenerator.generate(rootNodes, nodeToFinalParallelismMap);
+               new 
BatchShuffleStageParallelismCalculator(tEnv.getConfig().getConf(), 
tEnv.streamEnv().getParallelism()).calculate(nodeShuffleStageMap.values());
+               for (ExecNode<?, ?> node : nodeShuffleStageMap.keySet()) {
+                       
node.getResource().setParallelism(nodeShuffleStageMap.get(node).getParallelism());
+               }
+               return sinkNodes;
+       }
+
+       /**
+        * Filter sink nodes because parallelism of sink nodes is calculated 
after translateToPlan, as
+        * transformations generated by BatchExecSink have too many uncertainty 
factors. Filtering here
+        * can let later process easier.
+        */
+       private List<ExecNode<?, ?>> filterSinkNodes(List<ExecNode<?, ?>> 
sinkNodes) {
+               List<ExecNode<?, ?>> rootNodes = new ArrayList<>();
+               sinkNodes.forEach(s -> {
+                       if (s instanceof BatchExecSink) {
+                               rootNodes.add(s.getInputNodes().get(0));
+                       } else {
+                               rootNodes.add(s);
+                       }
+               });
+               return rootNodes;
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchShuffleStageParallelismCalculator.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchShuffleStageParallelismCalculator.java
new file mode 100644
index 0000000..f72bdb4
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchShuffleStageParallelismCalculator.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.batch.parallelism;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+import 
org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Parallelism calculator for shuffle stage.
+ */
+public class BatchShuffleStageParallelismCalculator {
+       private static final Logger LOG = 
LoggerFactory.getLogger(BatchShuffleStageParallelismCalculator.class);
+       private final Configuration tableConf;
+       private final int envParallelism;
+
+       public BatchShuffleStageParallelismCalculator(Configuration tableConf, 
int envParallelism) {
+               this.tableConf = tableConf;
+               this.envParallelism = envParallelism;
+       }
+
+       public void calculate(Collection<ShuffleStage> shuffleStages) {
+               Set<ShuffleStage> shuffleStageSet = new 
HashSet<>(shuffleStages);
+               shuffleStageSet.forEach(this::calculate);
+       }
+
+       @VisibleForTesting
+       protected void calculate(ShuffleStage shuffleStage) {
+               if (shuffleStage.isFinalParallelism()) {
+                       return;
+               }
+               Set<ExecNode<?, ?>> nodeSet = shuffleStage.getExecNodeSet();
+               int maxSourceParallelism = -1;
+               for (ExecNode<?, ?> node : nodeSet) {
+                       if (node instanceof BatchExecTableSourceScan) {
+                               int result = 
calculateSource((BatchExecTableSourceScan) node);
+                               if (result > maxSourceParallelism) {
+                                       maxSourceParallelism = result;
+                               }
+                       }
+               }
+               if (maxSourceParallelism > 0) {
+                       shuffleStage.setParallelism(maxSourceParallelism, 
false);
+               } else {
+                       
shuffleStage.setParallelism(NodeResourceConfig.getOperatorDefaultParallelism(getTableConf(),
 envParallelism), false);
+               }
+       }
+
+       private int calculateSource(BatchExecTableSourceScan tableSourceScan) {
+               boolean infer = 
!NodeResourceConfig.getInferMode(tableConf).equals(NodeResourceConfig.InferMode.NONE);
+               LOG.info("infer source partitions num: " + infer);
+               if (infer) {
+                       double rowCount = 
tableSourceScan.getEstimatedRowCount();
+                       LOG.info("source row count is : " + rowCount);
+                       long rowsPerPartition = 
NodeResourceConfig.getInferRowCountPerPartition(tableConf);
+                       int maxNum = 
NodeResourceConfig.getSourceMaxParallelism(tableConf);
+                       return Math.min(maxNum,
+                                       Math.max(
+                                                       (int) (rowCount / 
rowsPerPartition),
+                                                       1));
+               } else {
+                       return 
NodeResourceConfig.getSourceParallelism(tableConf, envParallelism);
+               }
+       }
+
+       private Configuration getTableConf() {
+               return this.tableConf;
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/NodeResourceConfig.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/NodeResourceConfig.java
new file mode 100644
index 0000000..f67a6aa
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/NodeResourceConfig.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.batch.parallelism;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableConfigOptions;
+
+/**
+ * Deal with resource config for {@link 
org.apache.flink.table.plan.nodes.exec.ExecNode}.
+ */
+public class NodeResourceConfig {
+
+       /**
+        * How many Bytes per MB.
+        */
+       public static final long SIZE_IN_MB =  1024L * 1024;
+
+       /**
+        * Gets the config parallelism for source.
+        * @param tableConf Configuration.
+        * @return the config parallelism for source.
+        */
+       public static int getSourceParallelism(Configuration tableConf, int 
envParallelism) {
+               int parallelism = tableConf.getInteger(
+                               
TableConfigOptions.SQL_RESOURCE_SOURCE_PARALLELISM);
+               if (parallelism <= 0) {
+                       parallelism = getOperatorDefaultParallelism(tableConf, 
envParallelism);
+               }
+               return parallelism;
+       }
+
+       /**
+        * Gets the config parallelism for sink. If it is not set, return -1.
+        * @param tableConf Configuration.
+        * @return the config parallelism for sink.
+        */
+       public static int getSinkParallelism(Configuration tableConf) {
+               return 
tableConf.getInteger(TableConfigOptions.SQL_RESOURCE_SINK_PARALLELISM);
+       }
+
+       /**
+        * Gets the config max num of source parallelism.
+        * @param tableConf Configuration.
+        * @return the config max num of source parallelism.
+        */
+       public static int getSourceMaxParallelism(Configuration tableConf) {
+               return tableConf.getInteger(
+                               
TableConfigOptions.SQL_RESOURCE_INFER_SOURCE_PARALLELISM_MAX);
+       }
+
+       /**
+        * Gets the config row count that one partition processes.
+        * @param tableConf Configuration.
+        * @return the config row count that one partition processes.
+        */
+       public static long getInferRowCountPerPartition(Configuration 
tableConf) {
+               return tableConf.getLong(
+                               
TableConfigOptions.SQL_RESOURCE_INFER_ROWS_PER_PARTITION);
+       }
+
+       /**
+        * Gets default parallelism of operator.
+        * @param tableConf Configuration.
+        * @return default parallelism of operator.
+        */
+       public static int getOperatorDefaultParallelism(Configuration 
tableConf, int envParallelism) {
+               int parallelism = tableConf.getInteger(
+                               
TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM);
+               if (parallelism <= 0) {
+                       parallelism = envParallelism;
+               }
+               return parallelism;
+       }
+
+       /**
+        * Infer resource mode.
+        */
+       public enum InferMode {
+               NONE, ONLY_SOURCE
+       }
+
+       public static InferMode getInferMode(Configuration tableConf) {
+               String config = tableConf.getString(
+                               TableConfigOptions.SQL_RESOURCE_INFER_MODE);
+               try {
+                       return InferMode.valueOf(config);
+               } catch (IllegalArgumentException ex) {
+                       throw new IllegalArgumentException("Infer mode can only 
be set: NONE or ONLY_SOURCE.");
+               }
+       }
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/ShuffleStage.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/ShuffleStage.java
new file mode 100644
index 0000000..80aa778
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/ShuffleStage.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.batch.parallelism;
+
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * There are no shuffle when transferring data in a shuffleStage.
+ */
+public class ShuffleStage {
+
+       private final Set<ExecNode<?, ?>> execNodeSet = new LinkedHashSet<>();
+
+       // parallelism of this shuffleStage.
+       private int parallelism = -1;
+
+       // whether this parallelism is final, if it is final, it can not be 
changed.
+       private boolean isFinalParallelism = false;
+
+       public void addNode(ExecNode<?, ?> node) {
+               execNodeSet.add(node);
+       }
+
+       public void addNodeSet(Set<ExecNode<?, ?>> nodeSet) {
+               execNodeSet.addAll(nodeSet);
+       }
+
+       public void removeNode(ExecNode<?, ?> node) {
+               this.execNodeSet.remove(node);
+       }
+
+       public Set<ExecNode<?, ?>> getExecNodeSet() {
+               return this.execNodeSet;
+       }
+
+       public int getParallelism() {
+               return parallelism;
+       }
+
+       public void setParallelism(int parallelism, boolean finalParallelism) {
+               if (this.isFinalParallelism) {
+                       if (finalParallelism && this.parallelism != 
parallelism) {
+                               throw new IllegalArgumentException("both fixed 
parallelism are not equal, old: " + this.parallelism + ", new: " + parallelism);
+                       }
+               } else {
+                       if (finalParallelism) {
+                               this.parallelism = parallelism;
+                               this.isFinalParallelism = true;
+                       } else {
+                               this.parallelism = Math.max(this.parallelism, 
parallelism);
+                       }
+               }
+       }
+
+       public boolean isFinalParallelism() {
+               return isFinalParallelism;
+       }
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/ShuffleStageGenerator.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/ShuffleStageGenerator.java
new file mode 100644
index 0000000..4e111e0
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/ShuffleStageGenerator.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.batch.parallelism;
+
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecExchange;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecUnion;
+
+import org.apache.calcite.rel.RelDistribution;
+
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Build exec nodes to shuffleStages according to {@link BatchExecExchange}.
+ * If there is data shuffle between two adjacent exec nodes,
+ * they are belong to different shuffleStages.
+ * If there is no data shuffle between two adjacent exec nodes, but
+ * they have different final parallelism, they are also belong to different 
shuffleStages.
+ */
+public class ShuffleStageGenerator {
+
+       private final Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = 
new LinkedHashMap<>();
+       private final Map<ExecNode<?, ?>, Integer> nodeToFinalParallelismMap;
+
+       private ShuffleStageGenerator(Map<ExecNode<?, ?>, Integer> 
nodeToFinalParallelismMap) {
+               this.nodeToFinalParallelismMap = nodeToFinalParallelismMap;
+       }
+
+       public static Map<ExecNode<?, ?>, ShuffleStage> 
generate(List<ExecNode<?, ?>> sinkNodes, Map<ExecNode<?, ?>, Integer> 
finalParallelismNodeMap) {
+               ShuffleStageGenerator generator = new 
ShuffleStageGenerator(finalParallelismNodeMap);
+               sinkNodes.forEach(generator::buildShuffleStages);
+               Map<ExecNode<?, ?>, ShuffleStage> result = 
generator.getNodeShuffleStageMap();
+               result.values().forEach(s -> {
+                       List<ExecNode<?, ?>> virtualNodeList = 
s.getExecNodeSet().stream().filter(ShuffleStageGenerator::isVirtualNode).collect(toList());
+                       virtualNodeList.forEach(s::removeNode);
+               });
+               return generator.getNodeShuffleStageMap().entrySet().stream()
+                               .filter(x -> !isVirtualNode(x.getKey()))
+                               .collect(Collectors.toMap(Map.Entry::getKey,
+                                               Map.Entry::getValue,
+                                               (e1, e2) -> e1,
+                                               LinkedHashMap::new));
+       }
+
+       private void buildShuffleStages(ExecNode<?, ?> execNode) {
+               if (nodeShuffleStageMap.containsKey(execNode)) {
+                       return;
+               }
+               for (ExecNode<?, ?> input : execNode.getInputNodes()) {
+                       buildShuffleStages((input));
+               }
+
+               if (execNode.getInputNodes().isEmpty()) {
+                       // source node
+                       ShuffleStage shuffleStage = new ShuffleStage();
+                       shuffleStage.addNode(execNode);
+                       if (nodeToFinalParallelismMap.containsKey(execNode)) {
+                               
shuffleStage.setParallelism(nodeToFinalParallelismMap.get(execNode), true);
+                       }
+                       nodeShuffleStageMap.put(execNode, shuffleStage);
+               } else if (execNode instanceof BatchExecExchange &&
+                               !(((BatchExecExchange) 
execNode).getDistribution().getType() == 
RelDistribution.Type.RANGE_DISTRIBUTED)) {
+                       // do nothing
+               } else {
+                       Set<ShuffleStage> inputShuffleStages = 
getInputShuffleStages(execNode);
+                       Integer parallelism = 
nodeToFinalParallelismMap.get(execNode);
+                       ShuffleStage inputShuffleStage = 
mergeInputShuffleStages(inputShuffleStages, parallelism);
+                       inputShuffleStage.addNode(execNode);
+                       nodeShuffleStageMap.put(execNode, inputShuffleStage);
+               }
+       }
+
+       private ShuffleStage mergeInputShuffleStages(Set<ShuffleStage> 
shuffleStageSet, Integer parallelism) {
+               if (parallelism != null) {
+                       ShuffleStage resultShuffleStage = new ShuffleStage();
+                       resultShuffleStage.setParallelism(parallelism, true);
+                       for (ShuffleStage shuffleStage : shuffleStageSet) {
+                               if (!shuffleStage.isFinalParallelism() || 
shuffleStage.getParallelism() == parallelism) {
+                                       mergeShuffleStage(resultShuffleStage, 
shuffleStage);
+                               }
+                       }
+                       return resultShuffleStage;
+               } else {
+                       ShuffleStage resultShuffleStage = 
shuffleStageSet.stream()
+                                       
.filter(ShuffleStage::isFinalParallelism)
+                                       
.max(Comparator.comparing(ShuffleStage::getParallelism))
+                                       .orElse(new ShuffleStage());
+                       for (ShuffleStage shuffleStage : shuffleStageSet) {
+                               if (!shuffleStage.isFinalParallelism() || 
shuffleStage.getParallelism() == resultShuffleStage.getParallelism()) {
+                                       mergeShuffleStage(resultShuffleStage, 
shuffleStage);
+                               }
+                       }
+                       return resultShuffleStage;
+               }
+       }
+
+       private void mergeShuffleStage(ShuffleStage shuffleStage, ShuffleStage 
other) {
+               Set<ExecNode<?, ?>> nodeSet = other.getExecNodeSet();
+               shuffleStage.addNodeSet(nodeSet);
+               for (ExecNode<?, ?> r : nodeSet) {
+                       nodeShuffleStageMap.put(r, shuffleStage);
+               }
+       }
+
+       private Set<ShuffleStage> getInputShuffleStages(ExecNode<?, ?> node) {
+               Set<ShuffleStage> shuffleStageList = new HashSet<>();
+               for (ExecNode<?, ?> input : node.getInputNodes()) {
+                       ShuffleStage oneInputShuffleStage = 
nodeShuffleStageMap.get(input);
+                       if (oneInputShuffleStage != null) {
+                               shuffleStageList.add(oneInputShuffleStage);
+                       }
+               }
+               return shuffleStageList;
+       }
+
+       private static boolean isVirtualNode(ExecNode<?, ?> node) {
+               return node instanceof BatchExecUnion;
+       }
+
+       private Map<ExecNode<?, ?>, ShuffleStage> getNodeShuffleStageMap() {
+               return nodeShuffleStageMap;
+       }
+}
+
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 05ba98e..79ad59c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -26,6 +26,8 @@ import org.apache.flink.streaming.api.graph.{StreamGraph, 
StreamGraphGenerator}
 import org.apache.flink.streaming.api.transformations.StreamTransformation
 import org.apache.flink.table.plan.`trait`.FlinkRelDistributionTraitDef
 import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
+import org.apache.flink.table.plan.nodes.process.DAGProcessContext
+import 
org.apache.flink.table.plan.nodes.resource.batch.parallelism.BatchParallelismProcessor
 import 
org.apache.flink.table.plan.optimize.{BatchCommonSubGraphBasedOptimizer, 
Optimizer}
 import org.apache.flink.table.plan.reuse.DeadlockBreakupProcessor
 import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
TableSourceSinkTable, TableSourceTable}
@@ -50,9 +52,9 @@ import _root_.scala.collection.JavaConversions._
   * @param config The [[TableConfig]] of this [[BatchTableEnvironment]].
   */
 class BatchTableEnvironment(
-    val streamEnv: StreamExecutionEnvironment,
+    val execEnv: StreamExecutionEnvironment,
     config: TableConfig)
-  extends TableEnvironment(config) {
+  extends TableEnvironment(execEnv, config) {
 
   // prefix for unique table names.
   override private[flink] val tableNamePrefix = "_DataStreamTable_"
@@ -120,8 +122,10 @@ class BatchTableEnvironment(
 
   override private[flink] def translateToExecNodeDag(rels: Seq[RelNode]): 
Seq[ExecNode[_, _]] = {
     val nodeDag = super.translateToExecNodeDag(rels)
+    val context = new DAGProcessContext(this)
     // breakup deadlock
-    new DeadlockBreakupProcessor().process(nodeDag)
+    val postNodeDag = new DeadlockBreakupProcessor().process(nodeDag, context)
+    new BatchParallelismProcessor().process(postNodeDag, context)
   }
 
   /**
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 8269613..2f31ef3 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -67,7 +67,7 @@ import _root_.scala.collection.JavaConversions._
 abstract class StreamTableEnvironment(
     private[flink] val execEnv: StreamExecutionEnvironment,
     config: TableConfig)
-  extends TableEnvironment(config) {
+  extends TableEnvironment(execEnv, config) {
 
   // prefix  for unique table names.
   override private[flink] val tableNamePrefix = "_DataStreamTable_"
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 82d1655..3dbf25c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -70,9 +70,13 @@ import _root_.scala.collection.mutable
 /**
   * The abstract base class for batch and stream TableEnvironments.
   *
+  * @param streamEnv The [[JavaStreamExecEnv]] which is wrapped in this
+  *                [[StreamTableEnvironment]].
   * @param config The configuration of the TableEnvironment
   */
-abstract class TableEnvironment(val config: TableConfig) {
+abstract class TableEnvironment(
+    val streamEnv: JavaStreamExecEnv,
+    val config: TableConfig) {
 
   protected val DEFAULT_JOB_NAME = "Flink Exec Table Job"
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdRowCount.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdRowCount.scala
index 3895292..60bc9f4 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdRowCount.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdRowCount.scala
@@ -26,10 +26,10 @@ import org.apache.calcite.rel.{RelNode, SingleRel}
 import org.apache.calcite.rex.{RexLiteral, RexNode}
 import org.apache.calcite.util._
 import org.apache.flink.table.JDouble
+import org.apache.flink.table.api.PlannerConfigOptions
 import org.apache.flink.table.calcite.FlinkContext
 import org.apache.flink.table.plan.logical.{LogicalWindow, SlidingGroupWindow, 
TumblingGroupWindow}
 import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank, 
WindowAggregate}
-import org.apache.flink.table.plan.nodes.exec.NodeResourceConfig
 import org.apache.flink.table.plan.nodes.physical.batch._
 import org.apache.flink.table.plan.stats.ValueInterval
 import org.apache.flink.table.plan.util.AggregateUtil.{hasTimeIntervalType, 
toLong}
@@ -171,7 +171,8 @@ class FlinkRelMdRowCount private extends 
MetadataHandler[BuiltInMetadata.RowCoun
     } else {
       val inputRowCnt = mq.getRowCount(input)
       val config = 
rel.getCluster.getPlanner.getContext.asInstanceOf[FlinkContext].getTableConfig
-      val parallelism = NodeResourceConfig.calOperatorParallelism(inputRowCnt, 
config.getConf)
+      val parallelism = (inputRowCnt /
+          
config.getConf.getLong(PlannerConfigOptions.SQL_OPTIMIZER_ROWS_PER_LOCALAGG) + 
1).toInt
       if (parallelism == 1) {
         ndvOfGroupKeysOnGlobalAgg
       } else if (grouping.isEmpty) {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/exec/ExecNode.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/exec/ExecNode.scala
index d86ecd5..9006ab7 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/exec/ExecNode.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/exec/ExecNode.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.plan.nodes.exec
 import org.apache.flink.streaming.api.transformations.StreamTransformation
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel
+import org.apache.flink.table.plan.nodes.resource.NodeResource
 
 import java.util
 
@@ -33,11 +34,21 @@ import java.util
 trait ExecNode[E <: TableEnvironment, T] {
 
   /**
+    * Defines how much resource the node will take.
+    */
+  private val resource: NodeResource = new NodeResource
+
+  /**
     * The [[StreamTransformation]] translated from this node.
     */
   private var transformation: StreamTransformation[T] = _
 
   /**
+    * Get node resource.
+    */
+  def getResource = resource
+
+  /**
     * Translates this node into a Flink operator.
     *
     * <p>NOTE: returns same translate result if called multiple times.
@@ -83,5 +94,4 @@ trait ExecNode[E <: TableEnvironment, T] {
   def accept(visitor: ExecNodeVisitor): Unit = {
     visitor.visit(this)
   }
-
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
index 4bcb025..8c26e9f 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
@@ -87,8 +87,9 @@ class BatchExecBoundedStreamScan(
       tableEnv: BatchTableEnvironment): StreamTransformation[BaseRow] = {
     val config = tableEnv.getConfig
     val batchTransform = boundedStreamTable.dataStream.getTransformation
+    batchTransform.setParallelism(getResource.getParallelism)
     if (needInternalConversion) {
-      ScanUtil.convertToInternalRow(
+      val conversionTransform = ScanUtil.convertToInternalRow(
         CodeGeneratorContext(config),
         batchTransform,
         boundedStreamTable.fieldIndexes,
@@ -97,11 +98,16 @@ class BatchExecBoundedStreamScan(
         getTable.getQualifiedName,
         config,
         None)
+      conversionTransform.setParallelism(getResource.getParallelism)
+      conversionTransform
     } else {
       batchTransform.asInstanceOf[StreamTransformation[BaseRow]]
     }
   }
 
+  def getSourceTransformation: StreamTransformation[_] =
+    boundedStreamTable.dataStream.getTransformation
+
   def needInternalConversion: Boolean = {
     ScanUtil.hasTimeAttributeField(boundedStreamTable.fieldIndexes) ||
         ScanUtil.needsConversion(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCalc.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCalc.scala
index f0909eb..0b3ca4e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCalc.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCalc.scala
@@ -160,6 +160,6 @@ class BatchExecCalc(
       RelExplainUtil.calcToString(calcProgram, getExpressionString),
       operator,
       BaseRowTypeInfo.of(outputType),
-      
config.getConf.getInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM))
+      getResource.getParallelism)
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala
index 16f0cb5..9623b37 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala
@@ -203,7 +203,7 @@ class BatchExecCorrelate(
       condition,
       outputRowType,
       joinType,
-      
tableEnv.getConfig.getConf.getInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM),
+      getResource.getParallelism,
       retainHeader = false,
       getExpressionString,
       "BatchExecCorrelate")
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExpand.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExpand.scala
index 6c01907..3327e1d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExpand.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExpand.scala
@@ -104,7 +104,7 @@ class BatchExecExpand(
       operatorName,
       operator,
       BaseRowTypeInfo.of(outputType),
-      
config.getConf.getInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM))
+      getResource.getParallelism)
   }
 
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregate.scala
index 3793ed3..105c9b7 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregate.scala
@@ -154,12 +154,4 @@ class BatchExecHashAggregate(
     val aggregateNamePrefix = if (isMerge) "Global" else "Complete"
     aggOperatorName(aggregateNamePrefix + "HashAggregate")
   }
-
-  override def getParallelism(input: StreamTransformation[BaseRow], conf: 
TableConfig): Int = {
-    if (isFinal && grouping.length == 0) {
-      1
-    } else {
-      
conf.getConf.getInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM)
-    }
-  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
index 85ebbc0..aee08ad 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.table.plan.nodes.physical.batch
 
 import org.apache.flink.streaming.api.transformations.{OneInputTransformation, 
StreamTransformation}
-import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig, 
TableConfigOptions}
+import org.apache.flink.table.api.{BatchTableEnvironment, TableConfigOptions}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGeneratorContext
 import org.apache.flink.table.codegen.agg.batch.{AggWithoutKeysCodeGenerator, 
HashAggCodeGenerator}
@@ -27,6 +27,7 @@ import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.plan.cost.FlinkCost._
 import org.apache.flink.table.plan.cost.FlinkCostFactory
 import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
+import 
org.apache.flink.table.plan.nodes.resource.batch.parallelism.NodeResourceConfig
 import 
org.apache.flink.table.plan.util.AggregateUtil.transformToBatchAggregateInfoList
 import org.apache.flink.table.plan.util.FlinkRelMdUtil
 import org.apache.flink.table.runtime.CodeGenOperatorFactory
@@ -116,8 +117,6 @@ abstract class BatchExecHashAggregateBase(
 
   def getOperatorName: String
 
-  def getParallelism(input: StreamTransformation[BaseRow], conf: TableConfig): 
Int
-
   override def translateToPlanInternal(
       tableEnv: BatchTableEnvironment): StreamTransformation[BaseRow] = {
     val input = getInputNodes.get(0).translateToPlan(tableEnv)
@@ -134,9 +133,9 @@ abstract class BatchExecHashAggregateBase(
         ctx, relBuilder, aggInfos, inputType, outputType, isMerge, isFinal, 
"NoGrouping")
     } else {
       val reservedManagedMem = tableEnv.config.getConf.getInteger(
-        TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM) * 
TableConfigOptions.SIZE_IN_MB
+        TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM) * 
NodeResourceConfig.SIZE_IN_MB
       val maxManagedMem = tableEnv.config.getConf.getInteger(
-        TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MAX_MEM) * 
TableConfigOptions.SIZE_IN_MB
+        TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MAX_MEM) * 
NodeResourceConfig.SIZE_IN_MB
       new HashAggCodeGenerator(
         ctx, relBuilder, aggInfos, inputType, outputType, grouping, 
auxGrouping, isMerge, isFinal
       ).genWithKeys(reservedManagedMem, maxManagedMem)
@@ -147,6 +146,6 @@ abstract class BatchExecHashAggregateBase(
       getOperatorName,
       operator,
       BaseRowTypeInfo.of(outputType),
-      getParallelism(input, tableEnv.config))
+      getResource.getParallelism)
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
index cb87455..15822b9 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
@@ -29,6 +29,7 @@ import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
 import org.apache.flink.table.plan.logical.LogicalWindow
 import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
+import 
org.apache.flink.table.plan.nodes.resource.batch.parallelism.NodeResourceConfig
 import 
org.apache.flink.table.plan.util.AggregateUtil.transformToBatchAggregateInfoList
 import org.apache.flink.table.plan.util.FlinkRelMdUtil
 import org.apache.flink.table.runtime.CodeGenOperatorFactory
@@ -132,9 +133,9 @@ abstract class BatchExecHashWindowAggregateBase(
     val (windowSize: Long, slideSize: Long) = 
WindowCodeGenerator.getWindowDef(window)
 
     val reservedManagedMem = tableEnv.config.getConf.getInteger(
-      TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM) * 
TableConfigOptions.SIZE_IN_MB
+      TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM) * 
NodeResourceConfig.SIZE_IN_MB
     val maxManagedMem = tableEnv.config.getConf.getInteger(
-      TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MAX_MEM) * 
TableConfigOptions.SIZE_IN_MB
+      TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MAX_MEM) * 
NodeResourceConfig.SIZE_IN_MB
 
     val generatedOperator = new HashWindowCodeGenerator(
       ctx, relBuilder, window, inputTimeFieldIndex,
@@ -148,6 +149,6 @@ abstract class BatchExecHashWindowAggregateBase(
       getOperatorName,
       operator,
       BaseRowTypeInfo.of(outputType),
-      
tableEnv.getConfig.getConf.getInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM))
+      getResource.getParallelism)
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLimit.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLimit.scala
index 3944461..8a98dbc 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLimit.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLimit.scala
@@ -110,7 +110,7 @@ class BatchExecLimit(
       getOperatorName,
       operator,
       inputType,
-      if (isGlobal) 1 else input.getParallelism)
+      getResource.getParallelism)
   }
 
   private def getOperatorName = {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalHashAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalHashAggregate.scala
index b475f46..ac3e8e0 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalHashAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalHashAggregate.scala
@@ -133,7 +133,4 @@ class BatchExecLocalHashAggregate(
   }
 
   override def getOperatorName: String = aggOperatorName("LocalHashAggregate")
-
-  override def getParallelism(input: StreamTransformation[BaseRow], conf: 
TableConfig): Int =
-    input.getParallelism
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala
index e62c850..ce85f2f 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala
@@ -142,7 +142,4 @@ class BatchExecLocalSortAggregate(
 
   override def getOperatorName: String = aggOperatorName("LocalSortAggregate")
 
-  override def getParallelism(input: StreamTransformation[BaseRow], conf: 
TableConfig): Int =
-    input.getParallelism
-
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
index 3839755..d8b68fc 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
@@ -19,10 +19,7 @@
 package org.apache.flink.table.plan.nodes.physical.batch
 
 import org.apache.flink.runtime.operators.DamBehavior
-import org.apache.flink.streaming.api.transformations.StreamTransformation
-import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.plan.logical.LogicalWindow
 
@@ -91,7 +88,4 @@ class BatchExecLocalSortWindowAggregate(
   override def getDamBehavior: DamBehavior = DamBehavior.MATERIALIZING
 
   override def getOperatorName: String = "LocalSortWindowAggregateBatchExec"
-
-  override def getParallelism(input: StreamTransformation[BaseRow], conf: 
TableConfig): Int =
-    input.getParallelism
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLookupJoin.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLookupJoin.scala
index 694db12..d6303ce 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLookupJoin.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLookupJoin.scala
@@ -89,14 +89,12 @@ class BatchExecLookupJoin(
 
     val inputTransformation = getInputNodes.get(0).translateToPlan(tableEnv)
       .asInstanceOf[StreamTransformation[BaseRow]]
-    val defaultParallelism = tableEnv.getConfig.getConf
-      .getInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM)
     val transformation = translateToPlanInternal(
       inputTransformation,
       tableEnv.streamEnv,
       tableEnv.config,
       tableEnv.getRelBuilder)
-    transformation.setParallelism(defaultParallelism)
+    transformation.setParallelism(getResource.getParallelism)
     transformation
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
index e7b1a4e..cb34e27 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
@@ -34,6 +34,7 @@ import 
org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistri
 import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
 import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
 import 
org.apache.flink.table.plan.nodes.physical.batch.OverWindowMode.OverWindowMode
+import 
org.apache.flink.table.plan.nodes.resource.batch.parallelism.NodeResourceConfig
 import 
org.apache.flink.table.plan.util.AggregateUtil.transformToBatchAggregateInfoList
 import org.apache.flink.table.plan.util.OverAggregateUtil.getLongBoundary
 import org.apache.flink.table.plan.util.{FlinkRelOptUtil, OverAggregateUtil, 
RelExplainUtil}
@@ -405,13 +406,13 @@ class BatchExecOverAggregate(
       val windowFrames = createOverWindowFrames(tableEnv)
       new BufferDataOverWindowOperator(
         tableEnv.getConfig.getConf.getInteger(
-          TableConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM) * 
TableConfigOptions.SIZE_IN_MB,
+          TableConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM) * 
NodeResourceConfig.SIZE_IN_MB,
         windowFrames,
         genComparator,
         inputType.getChildren.forall(t => BinaryRow.isInFixedLengthPart(t)))
     }
     new OneInputTransformation(
-      input, "OverAggregate", operator, BaseRowTypeInfo.of(outputType), 
input.getParallelism)
+      input, "OverAggregate", operator, BaseRowTypeInfo.of(outputType), 
getResource.getParallelism)
   }
 
   def createOverWindowFrames(tableEnv: BatchTableEnvironment): 
Array[OverWindowFrame] = {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecRank.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecRank.scala
index 2faa699..1dc1be8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecRank.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecRank.scala
@@ -288,7 +288,7 @@ class BatchExecRank(
       getOperatorName,
       operator,
       BaseRowTypeInfo.of(outputType),
-      input.getParallelism)
+      getResource.getParallelism)
   }
 
   private def getOperatorName: String = {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
index 22d88d1..f7565a7 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
@@ -27,6 +27,7 @@ import org.apache.flink.table.codegen.{CodeGenUtils, 
CodeGeneratorContext}
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.nodes.calcite.Sink
 import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
+import 
org.apache.flink.table.plan.nodes.resource.batch.parallelism.NodeResourceConfig
 import org.apache.flink.table.sinks.{BatchTableSink, DataStreamTableSink, 
TableSink}
 import 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
 import org.apache.flink.table.typeutils.BaseRowTypeInfo
@@ -80,9 +81,22 @@ class BatchExecSink[T](
       case batchTableSink: BatchTableSink[T] =>
         val transformation = translateToStreamTransformation(withChangeFlag = 
false, tableEnv)
         val boundedStream = new DataStream(tableEnv.streamEnv, transformation)
-        batchTableSink.emitBoundedStream(
+        val sinkTransformation = batchTableSink.emitBoundedStream(
           boundedStream, tableEnv.getConfig, 
tableEnv.streamEnv.getConfig).getTransformation
 
+        if (sinkTransformation.getMaxParallelism > 0) {
+          
sinkTransformation.setParallelism(sinkTransformation.getMaxParallelism)
+        } else {
+          val configSinkParallelism = NodeResourceConfig.getSinkParallelism(
+            tableEnv.getConfig.getConf)
+          if (configSinkParallelism > 0) {
+            sinkTransformation.setParallelism(configSinkParallelism)
+          } else if (boundedStream.getParallelism > 0) {
+            sinkTransformation.setParallelism(boundedStream.getParallelism)
+          }
+        }
+        sinkTransformation
+
       case streamTableSink: DataStreamTableSink[T] =>
         // In case of table to bounded stream through 
BatchTableEnvironment#toBoundedStream, we
         // insert a DataStreamTableSink then wrap it as a LogicalSink, there 
is no real batch table
@@ -133,6 +147,4 @@ class BatchExecSink[T](
                                    "This is a bug and should not happen. 
Please file an issue.")
     }
   }
-
-
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala
index e9f9ee8..ee106f3 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.codegen.sort.SortCodeGenerator
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
 import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
+import 
org.apache.flink.table.plan.nodes.resource.batch.parallelism.NodeResourceConfig
 import org.apache.flink.table.plan.util.{FlinkRelMdUtil, RelExplainUtil, 
SortUtil}
 import org.apache.flink.table.runtime.sort.SortOperator
 import org.apache.flink.table.typeutils.BaseRowTypeInfo
@@ -114,12 +115,12 @@ class BatchExecSort(
     val codeGen = new SortCodeGenerator(conf, keys, keyTypes, orders, 
nullsIsLast)
 
     val reservedMemorySize = conf.getConf.getInteger(
-      TableConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM) * 
TableConfigOptions.SIZE_IN_MB
+      TableConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM) * 
NodeResourceConfig.SIZE_IN_MB
 
     val maxMemorySize = conf.getConf.getInteger(
-      TableConfigOptions.SQL_RESOURCE_SORT_BUFFER_MAX_MEM) * 
TableConfigOptions.SIZE_IN_MB
+      TableConfigOptions.SQL_RESOURCE_SORT_BUFFER_MAX_MEM) * 
NodeResourceConfig.SIZE_IN_MB
     val perRequestSize = conf.getConf.getInteger(
-      TableConfigOptions.SQL_EXEC_PER_REQUEST_MEM) * 
TableConfigOptions.SIZE_IN_MB
+      TableConfigOptions.SQL_EXEC_PER_REQUEST_MEM) * 
NodeResourceConfig.SIZE_IN_MB
 
     val operator = new SortOperator(
       reservedMemorySize,
@@ -133,6 +134,6 @@ class BatchExecSort(
       s"Sort(${RelExplainUtil.collationToString(sortCollation, getRowType)})",
       operator.asInstanceOf[OneInputStreamOperator[BaseRow, BaseRow]],
       BaseRowTypeInfo.of(outputType),
-      input.getParallelism)
+      getResource.getParallelism)
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortAggregate.scala
index 1fded57..b4c7674 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortAggregate.scala
@@ -169,8 +169,4 @@ class BatchExecSortAggregate(
     aggOperatorName(aggregateNamePrefix + "SortAggregate")
   }
 
-  override def getParallelism(input: StreamTransformation[BaseRow], conf: 
TableConfig): Int = {
-    if (isFinal && grouping.length == 0) 1 else input.getParallelism
-  }
-
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
index 094df9b..f663fe8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
@@ -18,10 +18,10 @@
 package org.apache.flink.table.plan.nodes.physical.batch
 
 import org.apache.flink.streaming.api.transformations.{OneInputTransformation, 
StreamTransformation}
-import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig, 
TableConfigOptions}
+import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGeneratorContext
-import org.apache.flink.table.codegen.agg.batch.{AggWithoutKeysCodeGenerator, 
HashAggCodeGenerator, SortAggCodeGenerator}
+import org.apache.flink.table.codegen.agg.batch.{AggWithoutKeysCodeGenerator, 
SortAggCodeGenerator}
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
@@ -100,8 +100,6 @@ abstract class BatchExecSortAggregateBase(
 
   def getOperatorName: String
 
-  def getParallelism(input: StreamTransformation[BaseRow], conf: TableConfig): 
Int
-
   override def translateToPlanInternal(
       tableEnv: BatchTableEnvironment): StreamTransformation[BaseRow] = {
     val input = getInputNodes.get(0).translateToPlan(tableEnv)
@@ -126,6 +124,6 @@ abstract class BatchExecSortAggregateBase(
       getOperatorName,
       operator,
       BaseRowTypeInfo.of(outputType),
-      getParallelism(input, tableEnv.config))
+      getResource.getParallelism)
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortLimit.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortLimit.scala
index 76545a5..872f314 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortLimit.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortLimit.scala
@@ -144,7 +144,7 @@ class BatchExecSortLimit(
       getOperatorName,
       operator,
       inputType,
-      if (isGlobal) 1 else input.getParallelism)
+      getResource.getParallelism)
   }
 
   private def getOperatorName = {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
index 3a8217c..e2212b1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
@@ -29,6 +29,7 @@ import 
org.apache.flink.table.plan.`trait`.FlinkRelDistributionTraitDef
 import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory}
 import org.apache.flink.table.plan.nodes.ExpressionFormat
 import org.apache.flink.table.plan.nodes.exec.ExecNode
+import 
org.apache.flink.table.plan.nodes.resource.batch.parallelism.NodeResourceConfig
 import org.apache.flink.table.plan.util.{FlinkRelMdUtil, FlinkRelOptUtil, 
JoinUtil, SortUtil}
 import org.apache.flink.table.runtime.join.{FlinkJoinType, 
SortMergeJoinOperator}
 import org.apache.flink.table.types.logical.RowType
@@ -228,10 +229,10 @@ class BatchExecSortMergeJoin(
     val condFunc = generateCondition(config, leftType, rightType)
 
     val externalBufferMemory = config.getConf.getInteger(
-      TableConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM) * 
TableConfigOptions.SIZE_IN_MB
+      TableConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM) * 
NodeResourceConfig.SIZE_IN_MB
 
     val sortMemory = config.getConf.getInteger(
-      TableConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM) * 
TableConfigOptions.SIZE_IN_MB
+      TableConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM) * 
NodeResourceConfig.SIZE_IN_MB
 
     def newSortGen(originalKeys: Array[Int], t: RowType): SortCodeGenerator = {
       val originalOrders = originalKeys.map(_ => true)
@@ -270,7 +271,7 @@ class BatchExecSortMergeJoin(
       getOperatorName,
       operator,
       BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType)),
-      
tableEnv.getConfig.getConf.getInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM))
+      getResource.getParallelism)
   }
 
   private def estimateOutputSize(relNode: RelNode): Double = {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
index 2e0a526..7568444 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
@@ -96,8 +96,4 @@ class BatchExecSortWindowAggregate(
     val aggregateNamePrefix = if (isMerge) "Global" else "Complete"
     aggregateNamePrefix + "WindowSortAggregateBatchExec"
   }
-
-  override def getParallelism(input: StreamTransformation[BaseRow], conf: 
TableConfig): Int = {
-    if (isFinal && grouping.length == 0) 1 else input.getParallelism
-  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
index b0323d8..c48e648 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
@@ -105,8 +105,6 @@ abstract class BatchExecSortWindowAggregateBase(
 
   def getOperatorName: String
 
-  def getParallelism(input: StreamTransformation[BaseRow], conf: TableConfig): 
Int
-
   override def translateToPlanInternal(
       tableEnv: BatchTableEnvironment): StreamTransformation[BaseRow] = {
     val input = getInputNodes.get(0).translateToPlan(tableEnv)
@@ -140,6 +138,6 @@ abstract class BatchExecSortWindowAggregateBase(
       getOperatorName,
       operator,
       BaseRowTypeInfo.of(outputType),
-      getParallelism(input, tableEnv.config))
+      getResource.getParallelism)
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
index f25538f..ce0a826 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
@@ -18,8 +18,14 @@
 
 package org.apache.flink.table.plan.nodes.physical.batch
 
+import java.{lang, util}
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.runtime.operators.DamBehavior
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.transformations.StreamTransformation
 import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
 import org.apache.flink.table.codegen.CodeGeneratorContext
@@ -29,7 +35,6 @@ import 
org.apache.flink.table.plan.nodes.physical.PhysicalTableSourceScan
 import org.apache.flink.table.plan.schema.FlinkRelOptTable
 import org.apache.flink.table.plan.util.ScanUtil
 import org.apache.flink.table.sources.{BatchTableSource, TableSourceUtil}
-import 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
@@ -37,6 +42,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rex.RexNode
 
 import java.util
+import 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 
 import scala.collection.JavaConversions._
 
@@ -82,7 +88,8 @@ class BatchExecTableSourceScan(
       tableEnv: BatchTableEnvironment): StreamTransformation[BaseRow] = {
     val config = tableEnv.getConfig
     val bts = tableSource.asInstanceOf[BatchTableSource[_]]
-    val inputTransform = 
bts.getBoundedStream(tableEnv.streamEnv).getTransformation
+    val inputTransform = 
bts.getBoundedStream(tableEnv.execEnv).getTransformation
+    inputTransform.setParallelism(getResource.getParallelism)
 
     val fieldIndexes = TableSourceUtil.computeIndexMapping(
       tableSource,
@@ -108,7 +115,7 @@ class BatchExecTableSourceScan(
       tableEnv.getRelBuilder
     )
     if (needInternalConversion) {
-      ScanUtil.convertToInternalRow(
+      val conversionTransform = ScanUtil.convertToInternalRow(
         CodeGeneratorContext(config),
         inputTransform.asInstanceOf[StreamTransformation[Any]],
         fieldIndexes,
@@ -117,6 +124,8 @@ class BatchExecTableSourceScan(
         getTable.getQualifiedName,
         config,
         rowtimeExpression)
+      conversionTransform.setParallelism(getResource.getParallelism)
+      conversionTransform
     } else {
       inputTransform.asInstanceOf[StreamTransformation[BaseRow]]
     }
@@ -135,4 +144,13 @@ class BatchExecTableSourceScan(
           tableSource, classOf[BatchTableSource[_]], tableSource.getClass, 0)
           .getTypeClass.asInstanceOf[Class[_]])
   }
+
+  def getSourceTransformation(
+      streamEnv: StreamExecutionEnvironment): StreamTransformation[_] = {
+    
tableSource.asInstanceOf[BatchTableSource[_]].getBoundedStream(streamEnv).getTransformation
+  }
+
+  def getEstimatedRowCount: lang.Double = {
+    getCluster.getMetadataQuery.getRowCount(this)
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
index d6f0dca..32dbcd6 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
@@ -79,7 +79,10 @@ class BatchExecValues(
       getRowType,
       tuples,
       getRelTypeName)
-    tableEnv.streamEnv.createInput(inputFormat, 
inputFormat.getProducedType).getTransformation
+    val transformation = tableEnv.streamEnv.createInput(inputFormat,
+      inputFormat.getProducedType).getTransformation
+    transformation.setParallelism(getResource.getParallelism)
+    transformation
   }
 
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/DeadlockBreakupProcessor.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/DeadlockBreakupProcessor.scala
index 475c0c6..2106d0b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/DeadlockBreakupProcessor.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/reuse/DeadlockBreakupProcessor.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableException
 import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
 import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode, 
ExecNodeVisitorImpl}
 import org.apache.flink.table.plan.nodes.physical.batch._
+import org.apache.flink.table.plan.nodes.process.{DAGProcessContext, 
DAGProcessor}
 
 import com.google.common.collect.{Maps, Sets}
 import org.apache.calcite.rel.RelNode
@@ -82,9 +83,10 @@ import scala.collection.mutable
   *                ScanTableSource
   * }}}
   */
-class DeadlockBreakupProcessor {
+class DeadlockBreakupProcessor extends DAGProcessor {
 
-  def process(rootNodes: util.List[ExecNode[_, _]]): util.List[ExecNode[_, _]] 
= {
+  def process(rootNodes: util.List[ExecNode[_, _]],
+      context: DAGProcessContext): util.List[ExecNode[_, _]] = {
     if (!rootNodes.forall(_.isInstanceOf[BatchExecNode[_]])) {
       throw new TableException("Only BatchExecNode DAG is supported now")
     }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ExecNodePlanDumper.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ExecNodePlanDumper.scala
index 0f5ecfc..4184e9c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ExecNodePlanDumper.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ExecNodePlanDumper.scala
@@ -58,13 +58,15 @@ object ExecNodePlanDumper {
       detailLevel: SqlExplainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
       withExecNodeId: Boolean = false,
       withRetractTraits: Boolean = false,
-      withOutputType: Boolean = false): String = {
+      withOutputType: Boolean = false,
+      withResource: Boolean = false): String = {
     doConvertTreeToString(
       node,
       detailLevel = detailLevel,
       withExecNodeId = withExecNodeId,
       withRetractTraits = withRetractTraits,
-      withOutputType = withOutputType)
+      withOutputType = withOutputType,
+      withResource = withResource)
   }
 
   /**
@@ -83,14 +85,16 @@ object ExecNodePlanDumper {
       detailLevel: SqlExplainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
       withExecNodeId: Boolean = false,
       withRetractTraits: Boolean = false,
-      withOutputType: Boolean = false): String = {
+      withOutputType: Boolean = false,
+      withResource: Boolean = false): String = {
     if (nodes.length == 1) {
       return treeToString(
         nodes.head,
         detailLevel,
         withExecNodeId = withExecNodeId,
         withRetractTraits = withRetractTraits,
-        withOutputType = withOutputType)
+        withOutputType = withOutputType,
+        withResource = withResource)
     }
 
     val reuseInfoBuilder = new ReuseInfoBuilder()
@@ -124,7 +128,8 @@ object ExecNodePlanDumper {
             withRetractTraits = withRetractTraits,
             withOutputType = withOutputType,
             stopExplainNodes = Some(stopExplainNodes),
-            reuseInfoMap = Some(reuseInfoMap))
+            reuseInfoMap = Some(reuseInfoMap),
+            withResource = withResource)
           sb.append(reusePlan).append(System.lineSeparator)
           if (isReuseNode) {
             // update visit info after the reuse node visited
@@ -151,7 +156,8 @@ object ExecNodePlanDumper {
       withRetractTraits: Boolean = false,
       withOutputType: Boolean = false,
       stopExplainNodes: Option[util.Set[ExecNode[_, _]]] = None,
-      reuseInfoMap: Option[util.IdentityHashMap[ExecNode[_, _], (Integer, 
Boolean)]] = None
+      reuseInfoMap: Option[util.IdentityHashMap[ExecNode[_, _], (Integer, 
Boolean)]] = None,
+      withResource: Boolean = false
   ): String = {
     // TODO refactor this part of code
     //  get ExecNode explain value by RelNode#explain now
@@ -164,7 +170,8 @@ object ExecNodePlanDumper {
       withRetractTraits = withRetractTraits,
       withOutputType = withOutputType,
       stopExplainNodes = stopExplainNodes,
-      reuseInfoMap = reuseInfoMap)
+      reuseInfoMap = reuseInfoMap,
+      withResource = withResource)
     node.asInstanceOf[RelNode].explain(planWriter)
     sw.toString
   }
@@ -217,7 +224,8 @@ class NodeTreeWriterImpl(
     withRetractTraits: Boolean = false,
     withOutputType: Boolean = false,
     stopExplainNodes: Option[util.Set[ExecNode[_, _]]] = None,
-    reuseInfoMap: Option[util.IdentityHashMap[ExecNode[_, _], (Integer, 
Boolean)]] = None)
+    reuseInfoMap: Option[util.IdentityHashMap[ExecNode[_, _], (Integer, 
Boolean)]] = None,
+    withResource: Boolean = false)
   extends RelWriterImpl(pw, explainLevel, false) {
 
   require((stopExplainNodes.isEmpty && reuseInfoMap.isEmpty) ||
@@ -323,6 +331,10 @@ class NodeTreeWriterImpl(
         printValues.add(Pair.of("__id__", rel.getId.toString))
       }
 
+      if (withResource) {
+        printValues.add(Pair.of("resource", node.getResource))
+      }
+
       if (withRetractTraits) {
         rel match {
           case streamRel: StreamPhysicalRel =>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/MockNodeTestBase.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/MockNodeTestBase.java
new file mode 100644
index 0000000..5ecbb48
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/MockNodeTestBase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource;
+
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+import 
org.apache.flink.table.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecCalc;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecExchange;
+import 
org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
+
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Base for test with mock node list.
+ */
+public class MockNodeTestBase {
+
+       protected List<ExecNode> nodeList;
+
+       protected void updateNode(int index, ExecNode<?, ?> node) {
+               nodeList.set(index, node);
+               NodeResource resource = new NodeResource();
+               when(node.getResource()).thenReturn(resource);
+               when(node.toString()).thenReturn("id: " + index);
+               if (node instanceof BatchExecTableSourceScan) {
+                       StreamTransformation transformation = 
mock(StreamTransformation.class);
+                       when(((BatchExecTableSourceScan) 
node).getSourceTransformation(any())).thenReturn(transformation);
+                       when(transformation.getMaxParallelism()).thenReturn(-1);
+               } else if (node instanceof BatchExecBoundedStreamScan) {
+                       StreamTransformation transformation = 
mock(StreamTransformation.class);
+                       when(((BatchExecBoundedStreamScan) 
node).getSourceTransformation()).thenReturn(transformation);
+               } else if (node instanceof BatchExecExchange) {
+                       RelDistribution distribution = 
mock(RelDistribution.class);
+                       
when(distribution.getType()).thenReturn(RelDistribution.Type.BROADCAST_DISTRIBUTED);
+                       when(((BatchExecExchange) 
node).getDistribution()).thenReturn(distribution);
+               }
+       }
+
+       protected void createNodeList(int num) {
+               nodeList = new LinkedList<>();
+               for (int i = 0; i < num; i++) {
+                       ExecNode<?, ?>  node = mock(BatchExecCalc.class);
+                       when(node.getInputNodes()).thenReturn(new 
ArrayList<>());
+                       when(node.toString()).thenReturn("id: " + i);
+                       nodeList.add(node);
+               }
+       }
+
+       protected void connect(int nodeIndex, int... inputNodeIndexes) {
+               List<ExecNode<?, ?>> inputNodes = new 
ArrayList<>(inputNodeIndexes.length);
+               for (int inputIndex : inputNodeIndexes) {
+                       ExecNode<?, ?> input = nodeList.get(inputIndex);
+                       inputNodes.add(input);
+               }
+               
when(nodeList.get(nodeIndex).getInputNodes()).thenReturn(inputNodes);
+               if (inputNodeIndexes.length == 1 && nodeList.get(nodeIndex) 
instanceof SingleRel) {
+                       when(((SingleRel) 
nodeList.get(nodeIndex)).getInput()).thenReturn((RelNode) 
nodeList.get(inputNodeIndexes[0]));
+               } else if (inputNodeIndexes.length == 2 && 
nodeList.get(nodeIndex) instanceof BiRel) {
+                       when(((BiRel) 
nodeList.get(nodeIndex)).getLeft()).thenReturn((RelNode) 
nodeList.get(inputNodeIndexes[0]));
+                       when(((BiRel) 
nodeList.get(nodeIndex)).getRight()).thenReturn((RelNode) 
nodeList.get(inputNodeIndexes[1]));
+               }
+       }
+}
+
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchFinalParallelismSetterTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchFinalParallelismSetterTest.java
new file mode 100644
index 0000000..9d54445
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchFinalParallelismSetterTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.batch.parallelism;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.BatchTableEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+import 
org.apache.flink.table.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecExchange;
+import 
org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecUnion;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecValues;
+import org.apache.flink.table.plan.nodes.resource.MockNodeTestBase;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for BatchFinalParallelismSetter.
+ */
+public class BatchFinalParallelismSetterTest extends MockNodeTestBase {
+
+       private StreamExecutionEnvironment sEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+       private BatchTableEnvironment tEnv;
+
+       @Before
+       public void setUp() {
+               tEnv = TableEnvironment.getBatchTableEnvironment(sEnv);
+               sEnv.setParallelism(21);
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testSource() {
+               /**
+                *   0, Source   1, Source  2, Values  4, Source   5, Source
+                *            \      /      /            /           /
+                *             3, Union
+                */
+               createNodeList(6);
+               BatchExecTableSourceScan scan0 = 
mock(BatchExecTableSourceScan.class);
+               updateNode(0, scan0);
+               when(((BatchExecTableSourceScan) 
nodeList.get(0)).getSourceTransformation(any()).getMaxParallelism()).thenReturn(5);
+               updateNode(1, mock(BatchExecTableSourceScan.class));
+               updateNode(2, mock(BatchExecValues.class));
+               updateNode(3, mock(BatchExecUnion.class));
+               updateNode(4, mock(BatchExecBoundedStreamScan.class));
+               when(((BatchExecBoundedStreamScan) 
nodeList.get(4)).getSourceTransformation().getParallelism()).thenReturn(7);
+               updateNode(5, mock(BatchExecBoundedStreamScan.class));
+               connect(3, 0, 1, 2, 4, 5);
+               Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap = 
BatchFinalParallelismSetter.calculate(tEnv, 
Collections.singletonList(nodeList.get(3)));
+               assertEquals(4, finalParallelismNodeMap.size());
+               assertEquals(5, 
finalParallelismNodeMap.get(nodeList.get(0)).intValue());
+               assertEquals(1, 
finalParallelismNodeMap.get(nodeList.get(2)).intValue());
+               assertEquals(7, 
finalParallelismNodeMap.get(nodeList.get(4)).intValue());
+               assertEquals(21, 
finalParallelismNodeMap.get(nodeList.get(5)).intValue());
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testExchange() {
+               /**
+                *   0, Source    1, Source               10, Source
+                *        |         |                       |
+                *   2, Exchange  3, Exchange 8, Source  9, Exchange
+                *        |         |              \     /
+                *   4, Calc      5, Calc          7, Join
+                *         \         /              /
+                *          6, Union
+                */
+               createNodeList(11);
+               BatchExecTableSourceScan scan0 = 
mock(BatchExecTableSourceScan.class);
+               updateNode(0, scan0);
+               when(((BatchExecTableSourceScan) 
nodeList.get(0)).getSourceTransformation(any()).getMaxParallelism()).thenReturn(5);
+               BatchExecExchange execExchange4 = mock(BatchExecExchange.class, 
RETURNS_DEEP_STUBS);
+               
when(execExchange4.getDistribution().getType()).thenReturn(RelDistribution.Type.BROADCAST_DISTRIBUTED);
+               updateNode(2, execExchange4);
+               BatchExecExchange execExchange3 = mock(BatchExecExchange.class, 
RETURNS_DEEP_STUBS);
+               updateNode(3, execExchange3);
+               
when(execExchange3.getDistribution().getType()).thenReturn(RelDistribution.Type.SINGLETON);
+               BatchExecExchange execExchange5 = mock(BatchExecExchange.class, 
RETURNS_DEEP_STUBS);
+               updateNode(9, execExchange5);
+               
when(execExchange5.getDistribution().getType()).thenReturn(RelDistribution.Type.SINGLETON);
+               connect(2, 0);
+               connect(4, 2);
+               connect(3, 1);
+               connect(5, 3);
+               connect(6, 4, 5, 7);
+               connect(7, 8, 9);
+               connect(9, 10);
+               Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap = 
BatchFinalParallelismSetter.calculate(tEnv, 
Collections.singletonList(nodeList.get(6)));
+               assertEquals(3, finalParallelismNodeMap.size());
+               assertEquals(5, 
finalParallelismNodeMap.get(nodeList.get(0)).intValue());
+               assertEquals(1, 
finalParallelismNodeMap.get(nodeList.get(5)).intValue());
+               assertEquals(1, 
finalParallelismNodeMap.get(nodeList.get(7)).intValue());
+       }
+}
+
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchShuffleStageParallelismCalculatorTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchShuffleStageParallelismCalculatorTest.java
new file mode 100644
index 0000000..1ce39ca
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/BatchShuffleStageParallelismCalculatorTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.batch.parallelism;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableConfigOptions;
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecCalc;
+import 
org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link BatchShuffleStageParallelismCalculator}.
+ */
+public class BatchShuffleStageParallelismCalculatorTest {
+
+       private Configuration tableConf;
+       private BatchExecTableSourceScan tableSourceScan = 
mock(BatchExecTableSourceScan.class);
+       private int envParallelism = 5;
+
+       @Before
+       public void setUp() {
+               tableConf = new Configuration();
+               tableConf.setString(TableConfigOptions.SQL_RESOURCE_INFER_MODE, 
NodeResourceConfig.InferMode.ONLY_SOURCE.toString());
+               
tableConf.setLong(TableConfigOptions.SQL_RESOURCE_INFER_ROWS_PER_PARTITION, 
100);
+               
tableConf.setInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 50);
+               when(tableSourceScan.getEstimatedRowCount()).thenReturn(3000d);
+       }
+
+       @Test
+       public void testOnlySource() {
+               ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
+               
when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(tableSourceScan)));
+               new BatchShuffleStageParallelismCalculator(tableConf, 
envParallelism).calculate(shuffleStage0);
+               verify(shuffleStage0).setParallelism(30, false);
+       }
+
+       @Test
+       public void testSourceAndCalc() {
+               ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
+               BatchExecCalc calc = mock(BatchExecCalc.class);
+               
when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(tableSourceScan,
 calc)));
+               new BatchShuffleStageParallelismCalculator(tableConf, 
envParallelism).calculate(shuffleStage0);
+               verify(shuffleStage0).setParallelism(30, false);
+       }
+
+       @Test
+       public void testNoSource() {
+               ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
+               BatchExecCalc calc = mock(BatchExecCalc.class);
+               
when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(calc)));
+               new BatchShuffleStageParallelismCalculator(tableConf, 
envParallelism).calculate(shuffleStage0);
+               verify(shuffleStage0).setParallelism(50, false);
+       }
+
+       @Test
+       public void testEnvParallelism() {
+               tableConf.setString(TableConfigOptions.SQL_RESOURCE_INFER_MODE, 
NodeResourceConfig.InferMode.NONE.toString());
+               
tableConf.setInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, -1);
+               ShuffleStage shuffleStage0 = mock(ShuffleStage.class);
+               BatchExecCalc calc = mock(BatchExecCalc.class);
+               
when(shuffleStage0.getExecNodeSet()).thenReturn(getNodeSet(Arrays.asList(tableSourceScan,
 calc)));
+               new BatchShuffleStageParallelismCalculator(tableConf, 
envParallelism).calculate(shuffleStage0);
+               verify(shuffleStage0).setParallelism(5, false);
+       }
+
+       private Set<ExecNode<?, ?>> getNodeSet(List<ExecNode<?, ?>> nodeList) {
+               Set<ExecNode<?, ?>> nodeSet = new HashSet<>();
+               nodeSet.addAll(nodeList);
+               return nodeSet;
+       }
+}
+
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/ShuffleStageGeneratorTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/ShuffleStageGeneratorTest.java
new file mode 100644
index 0000000..48ce447
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/batch/parallelism/ShuffleStageGeneratorTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.batch.parallelism;
+
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecCalc;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecExchange;
+import 
org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecUnion;
+import org.apache.flink.table.plan.nodes.resource.MockNodeTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test for {@link ShuffleStageGenerator}.
+ */
+public class ShuffleStageGeneratorTest extends MockNodeTestBase {
+
+       private Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap;
+
+       @Before
+       public void setUp() {
+               finalParallelismNodeMap = new HashMap<>();
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testGenerateShuffleStags() {
+               /**
+                *
+                *    0, Source     1, Source
+                *             \     /
+                *             2, Union
+                *             /     \
+                *        3, Calc   4, Calc
+                *           |        |
+                *    5, Exchange    6, Exchange
+                *            \      /
+                *              7, Join
+                *               |
+                *              8, Calc
+                */
+               createNodeList(9);
+               updateNode(2, mock(BatchExecUnion.class));
+               updateNode(5, mock(BatchExecExchange.class));
+               updateNode(6, mock(BatchExecExchange.class));
+               connect(2, 0, 1);
+               connect(3, 2);
+               connect(4, 2);
+               connect(5, 3);
+               connect(6, 4);
+               connect(7, 5, 6);
+               connect(8, 7);
+
+               Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = 
ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(8)), 
finalParallelismNodeMap);
+
+               assertSameShuffleStage(nodeShuffleStageMap, 7, 8);
+               assertSameShuffleStage(nodeShuffleStageMap, 0, 1, 3, 4);
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testMultiOutput() {
+               /**
+                *
+                *    0, Source     2, Source  4, Source   6, Source
+                *       |            |         |             |
+                *    1, Calc       3, Calc    5, Calc     7, Exchange
+                *            \     /      \   /       \    /
+                *            8, Join     9, Join     10, Join
+                *             \          /       \    /
+                *              \   12, Exchange  \   /
+                *               \      /         \  /
+                *                 11, Join      13, Union
+                *                         \      |
+                *                15, Exchange   14, Calc
+                *                           \   /
+                *                           16, Join
+                */
+               createNodeList(17);
+               updateNode(7, mock(BatchExecExchange.class));
+               updateNode(12, mock(BatchExecExchange.class));
+               updateNode(13, mock(BatchExecUnion.class));
+               updateNode(15, mock(BatchExecExchange.class));
+               connect(1, 0);
+               connect(3, 2);
+               connect(5, 4);
+               connect(7, 6);
+               connect(8, 1, 3);
+               connect(9, 3, 5);
+               connect(10, 5, 7);
+               connect(12, 9);
+               connect(11, 8, 12);
+               connect(13, 9, 10);
+               connect(14, 13);
+               connect(15, 11);
+               connect(16, 15, 14);
+
+               Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = 
ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(16)), 
finalParallelismNodeMap);
+
+               assertSameShuffleStage(nodeShuffleStageMap, 0, 1, 8, 3, 2, 9, 
5, 4, 10, 11, 14, 16);
+               assertSameShuffleStage(nodeShuffleStageMap, 6);
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testWithFinalParallelism() {
+               /**
+                *
+                *    0, Source    2, Source
+                *       |            |
+                *    1, Calc      3, Calc  6, Source
+                *             \     /     /
+                *               4, Union
+                *                 |
+                *               5, Calc
+                */
+               createNodeList(7);
+               ExecNode<?, ?> scan0 = mock(BatchExecTableSourceScan.class);
+               ExecNode<?, ?>scan1 = mock(BatchExecTableSourceScan.class);
+               updateNode(0, scan0);
+               finalParallelismNodeMap.put(scan0, 10);
+               updateNode(2, scan1);
+               finalParallelismNodeMap.put(scan1, 11);
+               updateNode(4, mock(BatchExecUnion.class));
+               updateNode(5, mock(BatchExecCalc.class));
+               updateNode(6, mock(BatchExecTableSourceScan.class));
+               connect(1, 0);
+               connect(3, 2);
+               connect(4, 1, 3, 6);
+               connect(5, 4);
+
+               Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = 
ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(5)), 
finalParallelismNodeMap);
+
+               assertSameShuffleStage(nodeShuffleStageMap, 0, 1);
+               assertSameShuffleStage(nodeShuffleStageMap, 2, 3, 6, 5);
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testWithFinalParallelism1() {
+               /**
+                *
+                *    0, Source    2, Source
+                *       |            |
+                *    1, Calc      3, Calc
+                *             \     /
+                *               4, Union
+                *                 |
+                *               5, Calc
+                */
+               createNodeList(7);
+               ExecNode<?, ?> scan0 = mock(BatchExecTableSourceScan.class);
+               ExecNode<?, ?> scan1 = mock(BatchExecTableSourceScan.class);
+               updateNode(0, scan0);
+               finalParallelismNodeMap.put(scan0, 10);
+               updateNode(2, scan1);
+               finalParallelismNodeMap.put(scan1, 11);
+               updateNode(4, mock(BatchExecUnion.class));
+               ExecNode<?, ?> calc = mock(BatchExecCalc.class);
+               updateNode(5, calc);
+               finalParallelismNodeMap.put(calc, 12);
+               connect(1, 0);
+               connect(3, 2);
+               connect(4, 1, 3);
+               connect(5, 4);
+
+               Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = 
ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(5)), 
finalParallelismNodeMap);
+
+               assertSameShuffleStage(nodeShuffleStageMap, 0, 1);
+               assertSameShuffleStage(nodeShuffleStageMap, 2, 3);
+               assertSameShuffleStage(nodeShuffleStageMap, 5);
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testWithFinalParallelism2() {
+               /**
+                *
+                *    0, Source    2, Source
+                *       |            |
+                *       |         3, Exchange
+                *       |            |
+                *    1, Calc      4, Calc
+                *             \     /
+                *               5, Union
+                *                 |
+                *               6, Calc
+                */
+               createNodeList(7);
+               ExecNode<?, ?> scan0 = mock(BatchExecTableSourceScan.class);
+               ExecNode<?, ?> scan1 = mock(BatchExecTableSourceScan.class);
+               updateNode(0, scan0);
+               finalParallelismNodeMap.put(scan0, 10);
+               updateNode(2, scan1);
+               finalParallelismNodeMap.put(scan1, 11);
+               updateNode(3, mock(BatchExecExchange.class));
+               ExecNode<?, ?> calc = mock(BatchExecCalc.class);
+               updateNode(4, calc);
+               finalParallelismNodeMap.put(calc, 1);
+               updateNode(5, mock(BatchExecUnion.class));
+               connect(1, 0);
+               connect(3, 2);
+               connect(4, 3);
+               connect(5, 1, 4);
+               connect(6, 5);
+
+               Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = 
ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(6)), 
finalParallelismNodeMap);
+
+               assertSameShuffleStage(nodeShuffleStageMap, 0, 1, 6);
+               assertSameShuffleStage(nodeShuffleStageMap, 2);
+               assertSameShuffleStage(nodeShuffleStageMap, 4);
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testWithFinalParallelism3() {
+               /**
+                *
+                *    0, Source    2, Source
+                *       |            |
+                *    1, Calc      3, Calc  6, Source   7,Source
+                *             \     /     /             /
+                *               4, Union
+                *                 |
+                *               5, Calc
+                */
+               createNodeList(8);
+               ExecNode<?, ?> scan0 = mock(BatchExecTableSourceScan.class);
+               ExecNode<?, ?> scan1 = mock(BatchExecTableSourceScan.class);
+               updateNode(0, scan0);
+               finalParallelismNodeMap.put(scan0, 11);
+               updateNode(2, scan1);
+               finalParallelismNodeMap.put(scan1, 5);
+               ExecNode<?, ?> union4 = mock(BatchExecUnion.class);
+               updateNode(4, union4);
+               finalParallelismNodeMap.put(union4, 5);
+               updateNode(5, mock(BatchExecCalc.class));
+               updateNode(6, mock(BatchExecTableSourceScan.class));
+               updateNode(7, mock(BatchExecTableSourceScan.class));
+               connect(1, 0);
+               connect(3, 2);
+               connect(4, 1, 3, 6, 7);
+               connect(5, 4);
+
+               Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = 
ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(5)), 
finalParallelismNodeMap);
+
+               assertSameShuffleStage(nodeShuffleStageMap, 0, 1);
+               assertSameShuffleStage(nodeShuffleStageMap, 2, 3, 6, 5, 7);
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testWithFinalParallelism4() {
+               /**
+                *
+                *    0, Source    2, Source
+                *       |            |
+                *    1, Calc      3, Calc
+                *             \     /
+                *               4, Union
+                *                 |
+                *               5, Calc
+                */
+               createNodeList(6);
+               ExecNode<?, ?> scan0 = mock(BatchExecTableSourceScan.class);
+               ExecNode<?, ?> scan1 = mock(BatchExecTableSourceScan.class);
+               updateNode(0, scan0);
+               finalParallelismNodeMap.put(scan0, 11);
+               updateNode(2, scan1);
+               finalParallelismNodeMap.put(scan1, 5);
+               ExecNode<?, ?> union4 = mock(BatchExecUnion.class);
+               updateNode(4, union4);
+               finalParallelismNodeMap.put(union4, 3);
+               updateNode(5, mock(BatchExecCalc.class));
+               connect(1, 0);
+               connect(3, 2);
+               connect(4, 1, 3);
+               connect(5, 4);
+
+               Map<ExecNode<?, ?>, ShuffleStage> nodeShuffleStageMap = 
ShuffleStageGenerator.generate(Arrays.asList(nodeList.get(5)), 
finalParallelismNodeMap);
+
+               assertSameShuffleStage(nodeShuffleStageMap, 0, 1);
+               assertSameShuffleStage(nodeShuffleStageMap, 2, 3);
+               assertSameShuffleStage(nodeShuffleStageMap, 5);
+       }
+
+       private void assertSameShuffleStage(Map<ExecNode<?, ?>, ShuffleStage> 
nodeShuffleStageMap, int ... nodeIndexes) {
+               Set<ExecNode<?, ?>> nodeSet = new HashSet<>();
+               for (int index : nodeIndexes) {
+                       nodeSet.add(nodeList.get(index));
+               }
+               for (int index : nodeIndexes) {
+                       assertNotNull("shuffleStage should not be null. node 
index: " + index, nodeShuffleStageMap.get(nodeList.get(index)));
+                       assertEquals("node index: " + index, nodeSet, 
nodeShuffleStageMap.get(nodeList.get(index)).getExecNodeSet());
+               }
+       }
+}
+
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/nodes/resource/BatchExecResourceTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/nodes/resource/BatchExecResourceTest.xml
new file mode 100644
index 0000000..a0ad41b
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/nodes/resource/BatchExecResourceTest.xml
@@ -0,0 +1,185 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testConfigSourceParallelism[NONE]">
+    <Resource name="sql">
+      <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c 
limit 2]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[sum_a, c], resource=[{parallelism=1}])
++- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[true], 
resource=[{parallelism=1}])
+   +- Exchange(distribution=[single], resource=[{parallelism=-1}])
+      +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[false], 
resource=[{parallelism=18}])
+         +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, 
Final_SUM(sum$0) AS sum_a], resource=[{parallelism=18}])
+            +- Exchange(distribution=[hash[c]], resource=[{parallelism=-1}])
+               +- LocalHashAggregate(groupBy=[c], select=[c, Partial_SUM(a) AS 
sum$0], resource=[{parallelism=100}])
+                  +- Calc(select=[c, a], resource=[{parallelism=100}])
+                     +- TableSourceScan(table=[[table3, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=100}])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConfigSourceParallelism[ONLY_SOURCE]">
+    <Resource name="sql">
+      <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c 
limit 2]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[sum_a, c], resource=[{parallelism=1}])
++- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[true], 
resource=[{parallelism=1}])
+   +- Exchange(distribution=[single], resource=[{parallelism=-1}])
+      +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[false], 
resource=[{parallelism=18}])
+         +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, 
Final_SUM(sum$0) AS sum_a], resource=[{parallelism=18}])
+            +- Exchange(distribution=[hash[c]], resource=[{parallelism=-1}])
+               +- LocalHashAggregate(groupBy=[c], select=[c, Partial_SUM(a) AS 
sum$0], resource=[{parallelism=5}])
+                  +- Calc(select=[c, a], resource=[{parallelism=5}])
+                     +- TableSourceScan(table=[[table3, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=5}])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRangePartition[NONE]">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM Table5 where d < 100 order by e]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Sort(orderBy=[e ASC], resource=[{parallelism=18}])
++- Exchange(distribution=[range[e ASC]], resource=[{parallelism=18}])
+   +- Calc(select=[d, e, f, g, h], where=[<(d, 100)], 
resource=[{parallelism=18}])
+      +- TableSourceScan(table=[[Table5, source: [TestTableSource(d, e, f, g, 
h)]]], fields=[d, e, f, g, h], resource=[{parallelism=18}])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRangePartition[ONLY_SOURCE]">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM Table5 where d < 100 order by e]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Sort(orderBy=[e ASC], resource=[{parallelism=8}])
++- Exchange(distribution=[range[e ASC]], resource=[{parallelism=8}])
+   +- Calc(select=[d, e, f, g, h], where=[<(d, 100)], 
resource=[{parallelism=8}])
+      +- TableSourceScan(table=[[Table5, source: [TestTableSource(d, e, f, g, 
h)]]], fields=[d, e, f, g, h], resource=[{parallelism=8}])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSourcePartitionMaxNum[NONE]">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM table3]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+TableSourceScan(table=[[table3, source: [TestTableSource(a, b, c)]]], 
fields=[a, b, c], resource=[{parallelism=18}])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSortLimit[NONE]">
+    <Resource name="sql">
+      <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c 
limit 2]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[sum_a, c], resource=[{parallelism=1}])
++- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[true], 
resource=[{parallelism=1}])
+   +- Exchange(distribution=[single], resource=[{parallelism=-1}])
+      +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[false], 
resource=[{parallelism=18}])
+         +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, 
Final_SUM(sum$0) AS sum_a], resource=[{parallelism=18}])
+            +- Exchange(distribution=[hash[c]], resource=[{parallelism=-1}])
+               +- LocalHashAggregate(groupBy=[c], select=[c, Partial_SUM(a) AS 
sum$0], resource=[{parallelism=18}])
+                  +- Calc(select=[c, a], resource=[{parallelism=18}])
+                     +- TableSourceScan(table=[[table3, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSortLimit[ONLY_SOURCE]">
+    <Resource name="sql">
+      <![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c 
limit 2]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[sum_a, c], resource=[{parallelism=1}])
++- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[true], 
resource=[{parallelism=1}])
+   +- Exchange(distribution=[single], resource=[{parallelism=-1}])
+      +- SortLimit(orderBy=[c ASC], offset=[0], fetch=[2], global=[false], 
resource=[{parallelism=18}])
+         +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, 
Final_SUM(sum$0) AS sum_a], resource=[{parallelism=18}])
+            +- Exchange(distribution=[hash[c]], resource=[{parallelism=-1}])
+               +- LocalHashAggregate(groupBy=[c], select=[c, Partial_SUM(a) AS 
sum$0], resource=[{parallelism=5}])
+                  +- Calc(select=[c, a], resource=[{parallelism=5}])
+                     +- TableSourceScan(table=[[table3, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=5}])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUnionQuery[NONE]">
+    <Resource name="sql">
+      <![CDATA[SELECT sum(a) as sum_a, g FROM (SELECT a, b, c FROM table3 
UNION ALL SELECT a, b, c FROM table4), Table5 WHERE b = e group by g]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[sum_a, g], resource=[{parallelism=18}])
++- HashAggregate(isMerge=[true], groupBy=[g], select=[g, Final_SUM(sum$0) AS 
sum_a], resource=[{parallelism=18}])
+   +- Exchange(distribution=[hash[g]], resource=[{parallelism=-1}])
+      +- LocalHashAggregate(groupBy=[g], select=[g, Partial_SUM(a) AS sum$0], 
resource=[{parallelism=18}])
+         +- Calc(select=[g, a], resource=[{parallelism=18}])
+            +- HashJoin(joinType=[InnerJoin], where=[=(b, e)], select=[a, b, 
e, g], build=[left], resource=[{parallelism=18}])
+               :- Exchange(distribution=[hash[b]], resource=[{parallelism=-1}])
+               :  +- Union(all=[true], union=[a, b], 
resource=[{parallelism=-1}])
+               :     :- Calc(select=[a, b], resource=[{parallelism=18}])
+               :     :  +- TableSourceScan(table=[[table3, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
+               :     +- Calc(select=[a, b], resource=[{parallelism=18}])
+               :        +- TableSourceScan(table=[[table4, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=18}])
+               +- Exchange(distribution=[hash[e]], resource=[{parallelism=-1}])
+                  +- Calc(select=[e, g], resource=[{parallelism=18}])
+                     +- TableSourceScan(table=[[Table5, source: 
[TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h], 
resource=[{parallelism=18}])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSourcePartitionMaxNum[ONLY_SOURCE]">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM table3]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+TableSourceScan(table=[[table3, source: [TestTableSource(a, b, c)]]], 
fields=[a, b, c], resource=[{parallelism=2}])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUnionQuery[ONLY_SOURCE]">
+    <Resource name="sql">
+      <![CDATA[SELECT sum(a) as sum_a, g FROM (SELECT a, b, c FROM table3 
UNION ALL SELECT a, b, c FROM table4), Table5 WHERE b = e group by g]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[sum_a, g], resource=[{parallelism=18}])
++- HashAggregate(isMerge=[true], groupBy=[g], select=[g, Final_SUM(sum$0) AS 
sum_a], resource=[{parallelism=18}])
+   +- Exchange(distribution=[hash[g]], resource=[{parallelism=-1}])
+      +- LocalHashAggregate(groupBy=[g], select=[g, Partial_SUM(a) AS sum$0], 
resource=[{parallelism=18}])
+         +- Calc(select=[g, a], resource=[{parallelism=18}])
+            +- HashJoin(joinType=[InnerJoin], where=[=(b, e)], select=[a, b, 
e, g], build=[left], resource=[{parallelism=18}])
+               :- Exchange(distribution=[hash[b]], resource=[{parallelism=-1}])
+               :  +- Union(all=[true], union=[a, b], 
resource=[{parallelism=-1}])
+               :     :- Calc(select=[a, b], resource=[{parallelism=5}])
+               :     :  +- TableSourceScan(table=[[table3, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=5}])
+               :     +- Calc(select=[a, b], resource=[{parallelism=5}])
+               :        +- TableSourceScan(table=[[table4, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c], resource=[{parallelism=5}])
+               +- Exchange(distribution=[hash[e]], resource=[{parallelism=-1}])
+                  +- Calc(select=[e, g], resource=[{parallelism=8}])
+                     +- TableSourceScan(table=[[Table5, source: 
[TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h], 
resource=[{parallelism=8}])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/BatchExecResourceTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/BatchExecResourceTest.scala
new file mode 100644
index 0000000..8c55822
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/BatchExecResourceTest.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableConfig, TableConfigOptions, Types}
+import 
org.apache.flink.table.plan.nodes.resource.batch.parallelism.NodeResourceConfig
+import org.apache.flink.table.plan.stats.{FlinkStatistic, TableStats}
+import org.apache.flink.table.util.TableTestBase
+
+import org.junit.{Before, Test}
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import java.util.{Arrays => JArrays, Collection => JCollection}
+
+@RunWith(classOf[Parameterized])
+class BatchExecResourceTest(inferMode: String) extends TableTestBase {
+
+  private val util = batchTestUtil()
+
+  @Before
+  def before(): Unit = {
+    util.getTableEnv.getConfig.getConf.setString(
+      TableConfigOptions.SQL_RESOURCE_INFER_MODE,
+      inferMode
+    )
+    val table3Stats = new TableStats(5000000)
+    util.addTableSource("table3",
+      Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING),
+      Array("a", "b", "c"), 
FlinkStatistic.builder().tableStats(table3Stats).build())
+    val table5Stats = new TableStats(8000000)
+    util.addTableSource("Table5",
+      Array[TypeInformation[_]](Types.INT, Types.LONG, Types.INT, 
Types.STRING, Types.LONG),
+      Array("d", "e", "f", "g", "h"), 
FlinkStatistic.builder().tableStats(table5Stats).build())
+    BatchExecResourceTest.setResourceConfig(util.getTableEnv.getConfig)
+  }
+
+  @Test
+  def testSourcePartitionMaxNum(): Unit = {
+    util.getTableEnv.getConfig.getConf.setInteger(
+      TableConfigOptions.SQL_RESOURCE_INFER_SOURCE_PARALLELISM_MAX,
+      2
+    )
+    val sqlQuery = "SELECT * FROM table3"
+    util.verifyResource(sqlQuery)
+  }
+
+  @Test
+  def testSortLimit(): Unit = {
+    val sqlQuery = "SELECT sum(a) as sum_a, c FROM table3 group by c order by 
c limit 2"
+    util.verifyResource(sqlQuery)
+  }
+
+  @Test
+  def testConfigSourceParallelism(): Unit = {
+    util.getTableEnv.getConfig.getConf.setInteger(
+      TableConfigOptions.SQL_RESOURCE_SOURCE_PARALLELISM, 100)
+    val sqlQuery = "SELECT sum(a) as sum_a, c FROM table3 group by c order by 
c limit 2"
+    util.verifyResource(sqlQuery)
+  }
+
+  @Test
+  // TODO check when range partition added.
+  def testRangePartition(): Unit ={
+    util.getTableEnv.getConfig.getConf.setBoolean(
+      TableConfigOptions.SQL_EXEC_SORT_RANGE_ENABLED,
+      true)
+    val sqlQuery = "SELECT * FROM Table5 where d < 100 order by e"
+    util.verifyResource(sqlQuery)
+  }
+
+  @Test
+  def testUnionQuery(): Unit = {
+    val statsOfTable4 = new TableStats(100L)
+    util.addTableSource("table4",
+      Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING),
+      Array("a", "b", "c"),
+      FlinkStatistic.builder().tableStats(statsOfTable4).build())
+
+    val sqlQuery = "SELECT sum(a) as sum_a, g FROM " +
+        "(SELECT a, b, c FROM table3 UNION ALL SELECT a, b, c FROM table4), 
Table5 " +
+        "WHERE b = e group by g"
+    util.verifyResource(sqlQuery)
+  }
+}
+
+object BatchExecResourceTest {
+
+  @Parameterized.Parameters(name = "{0}")
+  def parameters(): JCollection[String] = JArrays.asList(
+    NodeResourceConfig.InferMode.NONE.toString,
+    NodeResourceConfig.InferMode.ONLY_SOURCE.toString)
+
+  def setResourceConfig(tableConfig: TableConfig): Unit = {
+    tableConfig.getConf.setInteger(
+      TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM,
+      18)
+    tableConfig.getConf.setInteger(
+      TableConfigOptions.SQL_RESOURCE_INFER_SOURCE_PARALLELISM_MAX,
+      1000)
+    tableConfig.getConf.setLong(
+      TableConfigOptions.SQL_RESOURCE_INFER_ROWS_PER_PARTITION,
+      1000000
+    )
+  }
+}
+
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
index 1e6c3cf..e32fce4 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala
@@ -51,7 +51,9 @@ class TableScanITCase extends BatchTestBase {
           row("Mary", new JLong(1L), new JInt(1)),
           row("Bob", new JLong(2L), new JInt(3))
         )
-        execEnv.fromCollection(data).returns(getReturnType)
+        val dataStream = execEnv.fromCollection(data).returns(getReturnType)
+        dataStream.getTransformation.setMaxParallelism(1)
+        dataStream
       }
 
       override def getReturnType: TypeInformation[Row] = new 
RowTypeInfo(fieldTypes, fieldNames)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index fda5a3c..25fdc74 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -305,6 +305,18 @@ abstract class TableTestUtil(test: TableTestBase) {
     assertEqualsOrExpand("planAfter", actual.toString, expand = false)
   }
 
+  def verifyResource(sql: String): Unit = {
+    assertEqualsOrExpand("sql", sql)
+    val table = getTableEnv.sqlQuery(sql)
+    doVerifyPlan(
+      table,
+      explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+      withRowType = false,
+      withRetractTraits = false,
+      printResource = true,
+      printPlanBefore = false)
+  }
+
   def doVerifyPlan(
       table: Table,
       explainLevel: SqlExplainLevel,
@@ -323,13 +335,15 @@ abstract class TableTestUtil(test: TableTestBase) {
       explainLevel: SqlExplainLevel,
       withRowType: Boolean,
       withRetractTraits: Boolean,
-      printPlanBefore: Boolean): Unit = {
+      printPlanBefore: Boolean,
+      printResource: Boolean = false): Unit = {
     val relNode = table.asInstanceOf[TableImpl].getRelNode
     val optimizedPlan = getOptimizedPlan(
       Array(relNode),
       explainLevel,
       withRetractTraits = withRetractTraits,
-      withRowType = withRowType)
+      withRowType = withRowType,
+      withResource = printResource)
 
     if (printPlanBefore) {
       val planBefore = SystemUtils.LINE_SEPARATOR +
@@ -393,7 +407,8 @@ abstract class TableTestUtil(test: TableTestBase) {
       relNodes: Array[RelNode],
       explainLevel: SqlExplainLevel,
       withRetractTraits: Boolean,
-      withRowType: Boolean): String = {
+      withRowType: Boolean,
+      withResource: Boolean = false): String = {
     require(relNodes.nonEmpty)
     val tEnv = getTableEnv
     val optimizedRels = tEnv.optimize(relNodes)
@@ -405,7 +420,8 @@ abstract class TableTestUtil(test: TableTestBase) {
           optimizedNodes,
           detailLevel = explainLevel,
           withRetractTraits = withRetractTraits,
-          withOutputType = withRowType)
+          withOutputType = withRowType,
+          withResource = withResource)
       case _ =>
         optimizedRels.map { rel =>
           FlinkRelOptUtil.toString(
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
index c40f457..a6f744a 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
@@ -50,7 +50,9 @@ class TestTableSourceWithTime[T](
   }
 
   override def getBoundedStream(streamEnv: StreamExecutionEnvironment): 
DataStreamSource[T] = {
-    streamEnv.fromCollection(values, returnType)
+    val dataStream = streamEnv.fromCollection(values, returnType)
+    dataStream.getTransformation.setMaxParallelism(1)
+    dataStream
   }
 
   override def getRowtimeAttributeDescriptors: 
util.List[RowtimeAttributeDescriptor] = {
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
index a0c3ec2..a693637 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
@@ -105,10 +105,17 @@ public class TableConfigOptions {
        //  Resource Options
        // 
------------------------------------------------------------------------
 
-       /**
-        * How many Bytes per MB.
-        */
-       public static final long SIZE_IN_MB =  1024L * 1024;
+       public static final ConfigOption<String> SQL_RESOURCE_INFER_MODE =
+                       key("sql.resource.infer.mode")
+                                       .defaultValue("NONE")
+                                       .withDescription("Sets infer resource 
mode according to statics. Only NONE, or ONLY_SOURCE can be set.\n" +
+                                                       "If set NONE, 
parallelism and memory of all node are set by config.\n" +
+                                                       "If set ONLY_SOURCE, 
only source parallelism is inferred according to statics.\n");
+
+       public static final ConfigOption<Integer> 
SQL_RESOURCE_INFER_SOURCE_PARALLELISM_MAX =
+                       key("sql.resource.infer.source.parallelism.max")
+                                       .defaultValue(1000)
+                                       .withDescription("Sets max infer 
parallelism for source operator.");
 
        public static final ConfigOption<Integer> 
SQL_RESOURCE_DEFAULT_PARALLELISM =
                        key("sql.resource.default.parallelism")
@@ -116,6 +123,18 @@ public class TableConfigOptions {
                                        .withDescription("Default parallelism 
of the job. If any node do not have special parallelism, use it." +
                                                        "Its default value is 
the num of cpu cores in the client host.");
 
+       public static final ConfigOption<Integer> 
SQL_RESOURCE_SOURCE_PARALLELISM =
+                       key("sql.resource.source.parallelism")
+                                       .defaultValue(-1)
+                                       .withDescription("Sets source 
parallelism if " + SQL_RESOURCE_INFER_MODE + " is NONE, " +
+                                                       "use " + 
SQL_RESOURCE_DEFAULT_PARALLELISM + " to set source parallelism.");
+
+       public static final ConfigOption<Integer> SQL_RESOURCE_SINK_PARALLELISM 
=
+                       key("sql.resource.sink.parallelism")
+                                       .defaultValue(-1)
+                                       .withDescription("Sets sink parallelism 
if it is set. If it is not set, " +
+                                                       "sink nodes will chain 
with ahead nodes as far as possible.");
+
        public static final ConfigOption<Integer> 
SQL_RESOURCE_EXTERNAL_BUFFER_MEM =
                        key("sql.resource.external-buffer.memory.mb")
                                        .defaultValue(10)
@@ -225,10 +244,4 @@ public class TableConfigOptions {
                                        .defaultValue(1000000L)
                                        .withDescription("Sets how many rows 
one partition processes. We will infer parallelism according " +
                                                        "to input row count.");
-
-       public static final ConfigOption<Integer> 
SQL_RESOURCE_INFER_OPERATOR_PARALLELISM_MAX =
-                       key("sql.resource.infer.operator.parallelism.max")
-                                       .defaultValue(800)
-                                       .withDescription("Sets max parallelism 
for all operators.");
-
 }

Reply via email to