This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f42a92c  [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler
f42a92c is described below

commit f42a92ccfbace9871834dd039a4e2025926b190c
Author: Lijie Wang <[email protected]>
AuthorDate: Fri Jan 28 10:06:05 2022 +0800

    [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler
    
    This closes #18462.
---
 .../optimizer/plantranslate/JobGraphGenerator.java |   1 +
 .../executiongraph/DefaultExecutionGraph.java      |   5 +
 .../flink/runtime/executiongraph/Execution.java    |   8 +-
 .../runtime/executiongraph/ExecutionGraph.java     |   8 +
 .../runtime/executiongraph/ExecutionJobVertex.java |   5 +
 .../IntermediateResultPartition.java               |  11 +-
 .../executiongraph/VertexGroupComputeUtil.java     |  57 ++++
 .../flip1/LogicalPipelinedRegionComputeUtil.java   |   4 +-
 .../failover/flip1/PipelinedRegionComputeUtil.java |  33 +--
 .../SchedulingPipelinedRegionComputeUtil.java      |   9 +-
 .../org/apache/flink/runtime/jobgraph/JobEdge.java |  12 +
 .../jobgraph/topology/DefaultLogicalTopology.java  |   2 +-
 .../DefaultSlotPoolServiceSchedulerFactory.java    |  11 +
 .../DefaultExecutionVertexOperations.java          |   3 +-
 .../flink/runtime/scheduler/DefaultScheduler.java  |  57 +++-
 .../scheduler/DefaultSchedulerComponents.java      |  43 +--
 .../scheduler/ExecutionVertexOperations.java       |   2 +-
 .../flink/runtime/scheduler/SchedulerBase.java     |   2 +-
 .../SlotSharingExecutionSlotAllocatorFactory.java  |   4 +-
 .../adaptivebatch/AdaptiveBatchScheduler.java      | 327 +++++++++++++++++++++
 .../AdaptiveBatchSchedulerFactory.java             | 197 +++++++++++++
 .../adaptivebatch/BlockingResultInfo.java          |  36 ++-
 .../adaptivebatch/forwardgroup/ForwardGroup.java   |  83 ++++++
 .../forwardgroup/ForwardGroupComputeUtil.java      |  98 ++++++
 .../runtime/util/SlotSelectionStrategyUtils.java   |  69 +++++
 .../executiongraph/ExecutionJobVertexTest.java     |  32 +-
 .../IntermediateResultPartitionTest.java           |   3 +-
 .../DefaultSchedulerComponentsFactoryTest.java     |  30 --
 .../runtime/scheduler/SchedulerTestingUtils.java   |  46 +--
 .../SsgNetworkMemoryCalculationUtilsTest.java      |   4 +-
 .../adaptive/StateTrackingMockExecutionGraph.java  |   5 +
 .../adaptivebatch/AdaptiveBatchSchedulerTest.java  | 181 ++++++++++++
 .../AdaptiveBatchSchedulerTestUtils.java           | 101 +++++++
 .../forwardgroup/ForwardGroupComputeUtilTest.java  | 205 +++++++++++++
 .../util/SlotSelectionStrategyUtilsTest.java       |  62 ++++
 .../api/graph/StreamingJobGraphGenerator.java      |   1 +
 .../scheduling/AdaptiveBatchSchedulerITCase.java   | 177 +++++++++++
 37 files changed, 1758 insertions(+), 176 deletions(-)

diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index b7f9755..0da624f 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -1404,6 +1404,7 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
 
         edge.setShipStrategyName(shipStrategy);
         edge.setBroadcast(isBroadcast);
+        edge.setForward(channel.getShipStrategy() == ShipStrategyType.FORWARD);
         edge.setPreProcessingOperationName(localStrategy);
         edge.setOperatorLevelCachingDescription(caching);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index 77f0e19..363aff9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -763,6 +763,11 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
     // 
--------------------------------------------------------------------------------------------
 
     @Override
+    public void notifyNewlyInitializedJobVertices(List<ExecutionJobVertex> 
vertices) {
+        executionTopology.notifyExecutionGraphUpdated(this, vertices);
+    }
+
+    @Override
     public void attachJobGraph(List<JobVertex> topologicallySorted) throws 
JobException {
         if (isDynamic) {
             attachJobGraph(topologicallySorted, Collections.emptyList());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 463c69f..c363878 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -704,8 +704,12 @@ public class Execution
     }
 
     private void updatePartitionConsumers(final IntermediateResultPartition 
partition) {
-        final ConsumerVertexGroup consumerVertexGroup = 
partition.getConsumerVertexGroup();
-        for (ExecutionVertexID consumerVertexId : consumerVertexGroup) {
+        final Optional<ConsumerVertexGroup> consumerVertexGroup =
+                partition.getConsumerVertexGroupOptional();
+        if (!consumerVertexGroup.isPresent()) {
+            return;
+        }
+        for (ExecutionVertexID consumerVertexId : consumerVertexGroup.get()) {
             final ExecutionVertex consumerVertex =
                     
vertex.getExecutionGraphAccessor().getExecutionVertexOrThrow(consumerVertexId);
             final Execution consumer = 
consumerVertex.getCurrentExecutionAttempt();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3e6fb2d..b8299cd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -220,4 +220,12 @@ public interface ExecutionGraph extends 
AccessExecutionGraph {
      *     first Execution with.
      */
     void initializeJobVertex(ExecutionJobVertex ejv, long createTimestamp) 
throws JobException;
+
+    /**
+     * Notify that some job vertices have been newly initialized, execution 
graph will try to update
+     * scheduling topology.
+     *
+     * @param vertices The execution job vertices that are newly initialized.
+     */
+    void notifyNewlyInitializedJobVertices(List<ExecutionJobVertex> vertices);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index a5c1fd7..2dd8cdc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -514,6 +514,11 @@ public class ExecutionJobVertex
         numExecutionVertexFinished--;
     }
 
+    public boolean isFinished() {
+        return isParallelismDecided()
+                && numExecutionVertexFinished == 
parallelismInfo.getParallelism();
+    }
+
     // 
--------------------------------------------------------------------------------------------
     //  Accumulators / Metrics
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
index 990516f..101339b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -25,8 +25,8 @@ import 
org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
 import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
 
 import java.util.List;
+import java.util.Optional;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 public class IntermediateResultPartition {
@@ -79,7 +79,14 @@ public class IntermediateResultPartition {
     }
 
     public ConsumerVertexGroup getConsumerVertexGroup() {
-        return 
checkNotNull(getEdgeManager().getConsumerVertexGroupForPartition(partitionId));
+        Optional<ConsumerVertexGroup> consumerVertexGroup = 
getConsumerVertexGroupOptional();
+        checkState(consumerVertexGroup.isPresent());
+        return consumerVertexGroup.get();
+    }
+
+    public Optional<ConsumerVertexGroup> getConsumerVertexGroupOptional() {
+        return Optional.ofNullable(
+                
getEdgeManager().getConsumerVertexGroupForPartition(partitionId));
     }
 
     public List<ConsumedPartitionGroup> getConsumedPartitionGroups() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexGroupComputeUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexGroupComputeUtil.java
new file mode 100644
index 0000000..dc6f9d9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexGroupComputeUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.Set;
+
+/** Common utils for processing vertex groups. */
+public final class VertexGroupComputeUtil {
+
+    public static <V> Set<V> mergeVertexGroups(
+            final Set<V> group1, final Set<V> group2, final Map<V, Set<V>> 
vertexToGroup) {
+
+        // merge the smaller group into the larger one to reduce the cost
+        final Set<V> smallerSet;
+        final Set<V> largerSet;
+        if (group1.size() < group2.size()) {
+            smallerSet = group1;
+            largerSet = group2;
+        } else {
+            smallerSet = group2;
+            largerSet = group1;
+        }
+        for (V v : smallerSet) {
+            vertexToGroup.put(v, largerSet);
+        }
+        largerSet.addAll(smallerSet);
+        return largerSet;
+    }
+
+    public static <V> Set<Set<V>> uniqueVertexGroups(final Map<V, Set<V>> 
vertexToGroup) {
+        final Set<Set<V>> distinctGroups = Collections.newSetFromMap(new 
IdentityHashMap<>());
+        distinctGroups.addAll(vertexToGroup.values());
+        return distinctGroups;
+    }
+
+    private VertexGroupComputeUtil() {}
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/LogicalPipelinedRegionComputeUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/LogicalPipelinedRegionComputeUtil.java
index f47476b..35f6789 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/LogicalPipelinedRegionComputeUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/LogicalPipelinedRegionComputeUtil.java
@@ -27,8 +27,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil.uniqueVertexGroups;
 import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.buildRawRegions;
-import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.uniqueRegions;
 
 /** Utils for computing {@link LogicalPipelinedRegion}s. */
 public final class LogicalPipelinedRegionComputeUtil {
@@ -43,7 +43,7 @@ public final class LogicalPipelinedRegionComputeUtil {
 
         // Since LogicalTopology is a DAG, there is no need to do cycle 
detection nor to merge
         // regions on cycles.
-        return uniqueRegions(vertexToRegion);
+        return uniqueVertexGroups(vertexToRegion);
     }
 
     private static Iterable<LogicalResult> getNonReconnectableConsumedResults(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
index e111418..65c659c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
@@ -19,10 +19,10 @@
 
 package org.apache.flink.runtime.executiongraph.failover.flip1;
 
+import org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil;
 import org.apache.flink.runtime.topology.Result;
 import org.apache.flink.runtime.topology.Vertex;
 
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Map;
@@ -67,7 +67,9 @@ public final class PipelinedRegionComputeUtil {
                 // this check can significantly reduce compute complexity in 
All-to-All
                 // PIPELINED edge case
                 if (currentRegion != producerRegion) {
-                    currentRegion = mergeRegions(currentRegion, 
producerRegion, vertexToRegion);
+                    currentRegion =
+                            VertexGroupComputeUtil.mergeVertexGroups(
+                                    currentRegion, producerRegion, 
vertexToRegion);
                 }
             }
         }
@@ -75,32 +77,5 @@ public final class PipelinedRegionComputeUtil {
         return vertexToRegion;
     }
 
-    static <V extends Vertex<?, ?, V, ?>> Set<V> mergeRegions(
-            final Set<V> region1, final Set<V> region2, final Map<V, Set<V>> 
vertexToRegion) {
-
-        // merge the smaller region into the larger one to reduce the cost
-        final Set<V> smallerSet;
-        final Set<V> largerSet;
-        if (region1.size() < region2.size()) {
-            smallerSet = region1;
-            largerSet = region2;
-        } else {
-            smallerSet = region2;
-            largerSet = region1;
-        }
-        for (V v : smallerSet) {
-            vertexToRegion.put(v, largerSet);
-        }
-        largerSet.addAll(smallerSet);
-        return largerSet;
-    }
-
-    static <V extends Vertex<?, ?, V, ?>> Set<Set<V>> uniqueRegions(
-            final Map<V, Set<V>> vertexToRegion) {
-        final Set<Set<V>> distinctRegions = Collections.newSetFromMap(new 
IdentityHashMap<>());
-        distinctRegions.addAll(vertexToRegion.values());
-        return distinctRegions;
-    }
-
     private PipelinedRegionComputeUtil() {}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/SchedulingPipelinedRegionComputeUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/SchedulingPipelinedRegionComputeUtil.java
index a44d8cf..ffe781f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/SchedulingPipelinedRegionComputeUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/SchedulingPipelinedRegionComputeUtil.java
@@ -36,9 +36,9 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 
+import static 
org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil.mergeVertexGroups;
+import static 
org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil.uniqueVertexGroups;
 import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.buildRawRegions;
-import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.mergeRegions;
-import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.uniqueRegions;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** Utils for computing {@link SchedulingPipelinedRegion}s. */
@@ -73,7 +73,7 @@ public final class SchedulingPipelinedRegionComputeUtil {
                     executionVertexRetriever) {
 
         final List<Set<SchedulingExecutionVertex>> regionList =
-                new ArrayList<>(uniqueRegions(vertexToRegion));
+                new ArrayList<>(uniqueVertexGroups(vertexToRegion));
         final List<List<Integer>> outEdges =
                 buildOutEdgesDesc(vertexToRegion, regionList, 
executionVertexRetriever);
         final Set<Set<Integer>> sccs =
@@ -88,7 +88,8 @@ public final class SchedulingPipelinedRegionComputeUtil {
             Set<SchedulingExecutionVertex> mergedRegion = new HashSet<>();
             for (int regionIndex : scc) {
                 mergedRegion =
-                        mergeRegions(mergedRegion, 
regionList.get(regionIndex), vertexToRegion);
+                        mergeVertexGroups(
+                                mergedRegion, regionList.get(regionIndex), 
vertexToRegion);
             }
             mergedRegions.add(mergedRegion);
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
index 8392c32..9772ff4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
@@ -57,6 +57,8 @@ public class JobEdge implements java.io.Serializable {
 
     private boolean isBroadcast;
 
+    private boolean isForward;
+
     /**
      * Optional name for the pre-processing operation (sort, combining sort, 
...), to be displayed
      * in the JSON plan.
@@ -176,6 +178,16 @@ public class JobEdge implements java.io.Serializable {
         isBroadcast = broadcast;
     }
 
+    /** Gets whether the edge is forward edge. */
+    public boolean isForward() {
+        return isForward;
+    }
+
+    /** Sets whether the edge is forward edge. */
+    public void setForward(boolean forward) {
+        isForward = forward;
+    }
+
     /**
      * Gets the channel state rescaler used for rescaling persisted data on 
downstream side of this
      * JobEdge.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/topology/DefaultLogicalTopology.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/topology/DefaultLogicalTopology.java
index 49024fe..4ba0bf6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/topology/DefaultLogicalTopology.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/topology/DefaultLogicalTopology.java
@@ -91,7 +91,7 @@ public class DefaultLogicalTopology implements 
LogicalTopology {
         return verticesSorted;
     }
 
-    private DefaultLogicalVertex getVertex(final JobVertexID vertexId) {
+    public DefaultLogicalVertex getVertex(final JobVertexID vertexId) {
         return Optional.ofNullable(idToVertexMap.get(vertexId))
                 .orElseThrow(
                         () -> new IllegalArgumentException("can not find 
vertex: " + vertexId));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
index 03300ed..58795fc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
@@ -47,6 +47,7 @@ import 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
 import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerFactory;
+import 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.util.clock.SystemClock;
 
@@ -178,6 +179,16 @@ public final class DefaultSlotPoolServiceSchedulerFactory
                         new DeclarativeSlotPoolServiceFactory(
                                 SystemClock.getInstance(), slotIdleTimeout, 
rpcTimeout);
                 break;
+            case AdaptiveBatch:
+                schedulerNGFactory = new AdaptiveBatchSchedulerFactory();
+                slotPoolServiceFactory =
+                        new DeclarativeSlotPoolBridgeServiceFactory(
+                                SystemClock.getInstance(),
+                                rpcTimeout,
+                                slotIdleTimeout,
+                                batchSlotTimeout,
+                                getRequestSlotMatchingStrategy(configuration, 
jobType));
+                break;
             default:
                 throw new IllegalArgumentException(
                         String.format(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java
index 89034c7..d7454c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java
@@ -24,7 +24,8 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 
 import java.util.concurrent.CompletableFuture;
 
-class DefaultExecutionVertexOperations implements ExecutionVertexOperations {
+/** Default implementation of {@link ExecutionVertexOperations}. */
+public class DefaultExecutionVertexOperations implements 
ExecutionVertexOperations {
 
     @Override
     public void deploy(final ExecutionVertex executionVertex) throws 
JobException {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index fba1bfa..1d0f3b2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -85,7 +85,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 /** The future default scheduler. */
 public class DefaultScheduler extends SchedulerBase implements 
SchedulerOperations {
 
-    private final Logger log;
+    protected final Logger log;
 
     private final ClassLoader userCodeLoader;
 
@@ -95,7 +95,7 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
 
     private final ScheduledExecutor delayExecutor;
 
-    private final SchedulingStrategy schedulingStrategy;
+    protected final SchedulingStrategy schedulingStrategy;
 
     private final ExecutionVertexOperations executionVertexOperations;
 
@@ -136,6 +136,57 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
             final ShuffleMaster<?> shuffleMaster,
             final Time rpcTimeout)
             throws Exception {
+        this(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                startUpAction,
+                delayExecutor,
+                userCodeLoader,
+                checkpointsCleaner,
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                schedulingStrategyFactory,
+                failoverStrategyFactory,
+                restartBackoffTimeStrategy,
+                executionVertexOperations,
+                executionVertexVersioner,
+                executionSlotAllocatorFactory,
+                initializationTimestamp,
+                mainThreadExecutor,
+                jobStatusListener,
+                executionGraphFactory,
+                shuffleMaster,
+                rpcTimeout,
+                computeVertexParallelismStore(jobGraph));
+    }
+
+    protected DefaultScheduler(
+            final Logger log,
+            final JobGraph jobGraph,
+            final Executor ioExecutor,
+            final Configuration jobMasterConfiguration,
+            final Consumer<ComponentMainThreadExecutor> startUpAction,
+            final ScheduledExecutor delayExecutor,
+            final ClassLoader userCodeLoader,
+            final CheckpointsCleaner checkpointsCleaner,
+            final CheckpointRecoveryFactory checkpointRecoveryFactory,
+            final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            final SchedulingStrategyFactory schedulingStrategyFactory,
+            final FailoverStrategy.Factory failoverStrategyFactory,
+            final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+            final ExecutionVertexOperations executionVertexOperations,
+            final ExecutionVertexVersioner executionVertexVersioner,
+            final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
+            long initializationTimestamp,
+            final ComponentMainThreadExecutor mainThreadExecutor,
+            final JobStatusListener jobStatusListener,
+            final ExecutionGraphFactory executionGraphFactory,
+            final ShuffleMaster<?> shuffleMaster,
+            final Time rpcTimeout,
+            final VertexParallelismStore vertexParallelismStore)
+            throws Exception {
 
         super(
                 log,
@@ -151,7 +202,7 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
                 mainThreadExecutor,
                 jobStatusListener,
                 executionGraphFactory,
-                computeVertexParallelismStore(jobGraph));
+                vertexParallelismStore);
 
         this.log = log;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
index 0898c84..db72afa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
@@ -19,28 +19,21 @@
 
 package org.apache.flink.runtime.scheduler;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.jobgraph.JobType;
-import 
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
 import 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
 import 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
-import 
org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
 import 
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.util.SlotSelectionStrategyUtils;
 import org.apache.flink.util.clock.SystemClock;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.function.Consumer;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -50,7 +43,6 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  * PipelinedRegionSchedulingStrategy}.
  */
 public class DefaultSchedulerComponents {
-    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultSchedulerComponents.class);
 
     private final SchedulingStrategyFactory schedulingStrategyFactory;
     private final Consumer<ComponentMainThreadExecutor> startUpAction;
@@ -99,7 +91,8 @@ public class DefaultSchedulerComponents {
             final Time slotRequestTimeout) {
 
         final SlotSelectionStrategy slotSelectionStrategy =
-                selectSlotSelectionStrategy(jobType, jobMasterConfiguration);
+                SlotSelectionStrategyUtils.selectSlotSelectionStrategy(
+                        jobType, jobMasterConfiguration);
         final PhysicalSlotRequestBulkChecker bulkChecker =
                 PhysicalSlotRequestBulkCheckerImpl.createFromSlotPool(
                         slotPool, SystemClock.getInstance());
@@ -116,34 +109,4 @@ public class DefaultSchedulerComponents {
                 bulkChecker::start,
                 allocatorFactory);
     }
-
-    @VisibleForTesting
-    static SlotSelectionStrategy selectSlotSelectionStrategy(
-            final JobType jobType, final Configuration configuration) {
-        final boolean evenlySpreadOutSlots =
-                
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
-
-        final SlotSelectionStrategy locationPreferenceSlotSelectionStrategy;
-
-        locationPreferenceSlotSelectionStrategy =
-                evenlySpreadOutSlots
-                        ? 
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut()
-                        : 
LocationPreferenceSlotSelectionStrategy.createDefault();
-
-        final boolean isLocalRecoveryEnabled =
-                configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY);
-        if (isLocalRecoveryEnabled) {
-            if (jobType == JobType.STREAMING) {
-                return PreviousAllocationSlotSelectionStrategy.create(
-                        locationPreferenceSlotSelectionStrategy);
-            } else {
-                LOG.warn(
-                        "Batch job does not support local recovery. Falling 
back to use "
-                                + 
locationPreferenceSlotSelectionStrategy.getClass());
-                return locationPreferenceSlotSelectionStrategy;
-            }
-        } else {
-            return locationPreferenceSlotSelectionStrategy;
-        }
-    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
index 1dedf15..11cd1c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
@@ -25,7 +25,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import java.util.concurrent.CompletableFuture;
 
 /** Operations on the {@link ExecutionVertex}. */
-interface ExecutionVertexOperations {
+public interface ExecutionVertexOperations {
 
     void deploy(ExecutionVertex executionVertex) throws JobException;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 70d4ab2..561e07a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -152,7 +152,7 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
 
     private final ExecutionGraphHandler executionGraphHandler;
 
-    private final OperatorCoordinatorHandler operatorCoordinatorHandler;
+    protected final OperatorCoordinatorHandler operatorCoordinatorHandler;
 
     private final ComponentMainThreadExecutor mainThreadExecutor;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
index 2fc3bb3..5edcb3b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecke
 import 
org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory;
 
 /** Factory for {@link SlotSharingExecutionSlotAllocator}. */
-class SlotSharingExecutionSlotAllocatorFactory implements 
ExecutionSlotAllocatorFactory {
+public class SlotSharingExecutionSlotAllocatorFactory implements 
ExecutionSlotAllocatorFactory {
     private final PhysicalSlotProvider slotProvider;
 
     private final boolean slotWillBeOccupiedIndefinitely;
@@ -35,7 +35,7 @@ class SlotSharingExecutionSlotAllocatorFactory implements 
ExecutionSlotAllocator
 
     private final SlotSharingStrategy.Factory slotSharingStrategyFactory;
 
-    SlotSharingExecutionSlotAllocatorFactory(
+    public SlotSharingExecutionSlotAllocatorFactory(
             PhysicalSlotProvider slotProvider,
             boolean slotWillBeOccupiedIndefinitely,
             PhysicalSlotRequestBulkChecker bulkChecker,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
new file mode 100644
index 0000000..1403fb4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalVertex;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexOperations;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
+import 
org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroup;
+import 
org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroupComputeUtil;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This scheduler decides the parallelism of JobVertex according to the data 
volume it consumes. A
+ * dynamically built up ExecutionGraph is used for this purpose.
+ */
+public class AdaptiveBatchScheduler extends DefaultScheduler implements 
SchedulerOperations {
+
+    private final DefaultLogicalTopology logicalTopology;
+
+    private final VertexParallelismDecider vertexParallelismDecider;
+
+    private final Map<JobVertexID, ForwardGroup> forwardGroupsByJobVertexId;
+
+    AdaptiveBatchScheduler(
+            final Logger log,
+            final JobGraph jobGraph,
+            final Executor ioExecutor,
+            final Configuration jobMasterConfiguration,
+            final Consumer<ComponentMainThreadExecutor> startUpAction,
+            final ScheduledExecutor delayExecutor,
+            final ClassLoader userCodeLoader,
+            final CheckpointsCleaner checkpointsCleaner,
+            final CheckpointRecoveryFactory checkpointRecoveryFactory,
+            final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            final SchedulingStrategyFactory schedulingStrategyFactory,
+            final FailoverStrategy.Factory failoverStrategyFactory,
+            final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+            final ExecutionVertexOperations executionVertexOperations,
+            final ExecutionVertexVersioner executionVertexVersioner,
+            final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
+            long initializationTimestamp,
+            final ComponentMainThreadExecutor mainThreadExecutor,
+            final JobStatusListener jobStatusListener,
+            final ExecutionGraphFactory executionGraphFactory,
+            final ShuffleMaster<?> shuffleMaster,
+            final Time rpcTimeout,
+            final VertexParallelismDecider vertexParallelismDecider,
+            int defaultMaxParallelism)
+            throws Exception {
+
+        super(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                startUpAction,
+                delayExecutor,
+                userCodeLoader,
+                checkpointsCleaner,
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                schedulingStrategyFactory,
+                failoverStrategyFactory,
+                restartBackoffTimeStrategy,
+                executionVertexOperations,
+                executionVertexVersioner,
+                executionSlotAllocatorFactory,
+                initializationTimestamp,
+                mainThreadExecutor,
+                jobStatusListener,
+                executionGraphFactory,
+                shuffleMaster,
+                rpcTimeout,
+                computeVertexParallelismStoreForDynamicGraph(
+                        jobGraph.getVertices(), defaultMaxParallelism));
+
+        this.logicalTopology = DefaultLogicalTopology.fromJobGraph(jobGraph);
+
+        this.vertexParallelismDecider = vertexParallelismDecider;
+
+        this.forwardGroupsByJobVertexId =
+                ForwardGroupComputeUtil.computeForwardGroups(
+                        jobGraph.getVerticesSortedTopologicallyFromSources(),
+                        getExecutionGraph()::getJobVertex);
+    }
+
+    @Override
+    public void startSchedulingInternal() {
+        initializeVerticesIfPossible();
+
+        super.startSchedulingInternal();
+    }
+
+    @Override
+    protected void updateTaskExecutionStateInternal(
+            final ExecutionVertexID executionVertexId,
+            final TaskExecutionStateTransition taskExecutionState) {
+
+        initializeVerticesIfPossible();
+
+        super.updateTaskExecutionStateInternal(executionVertexId, 
taskExecutionState);
+    }
+
+    private void initializeVerticesIfPossible() {
+        final List<ExecutionJobVertex> newlyInitializedJobVertices = new 
ArrayList<>();
+        try {
+            final long createTimestamp = System.currentTimeMillis();
+            for (ExecutionJobVertex jobVertex : 
getExecutionGraph().getVerticesTopologically()) {
+                maybeSetParallelism(jobVertex);
+            }
+            for (ExecutionJobVertex jobVertex : 
getExecutionGraph().getVerticesTopologically()) {
+                if (canInitialize(jobVertex)) {
+                    getExecutionGraph().initializeJobVertex(jobVertex, 
createTimestamp);
+                    newlyInitializedJobVertices.add(jobVertex);
+                }
+            }
+        } catch (JobException ex) {
+            log.error("Unexpected error occurred when initializing 
ExecutionJobVertex", ex);
+            failJob(ex, System.currentTimeMillis());
+        }
+
+        if (newlyInitializedJobVertices.size() > 0) {
+            updateTopology(newlyInitializedJobVertices);
+        }
+    }
+
+    private void maybeSetParallelism(final ExecutionJobVertex jobVertex) {
+        if (jobVertex.isParallelismDecided()) {
+            return;
+        }
+
+        Optional<List<BlockingResultInfo>> consumedResultsInfo =
+                tryGetConsumedResultsInfo(jobVertex);
+        if (!consumedResultsInfo.isPresent()) {
+            return;
+        }
+
+        ForwardGroup forwardGroup = 
forwardGroupsByJobVertexId.get(jobVertex.getJobVertexId());
+        int parallelism;
+
+        if (forwardGroup != null && forwardGroup.isParallelismDecided()) {
+            parallelism = forwardGroup.getParallelism();
+            log.info(
+                    "Parallelism of JobVertex: {} ({}) is decided to be {} 
according to forward group's parallelism.",
+                    jobVertex.getName(),
+                    jobVertex.getJobVertexId(),
+                    parallelism);
+
+        } else {
+            parallelism =
+                    
vertexParallelismDecider.decideParallelismForVertex(consumedResultsInfo.get());
+            if (forwardGroup != null) {
+                forwardGroup.setParallelism(parallelism);
+            }
+
+            log.info(
+                    "Parallelism of JobVertex: {} ({}) is decided to be {}.",
+                    jobVertex.getName(),
+                    jobVertex.getJobVertexId(),
+                    parallelism);
+        }
+
+        changeJobVertexParallelism(jobVertex, parallelism);
+    }
+
+    private void changeJobVertexParallelism(ExecutionJobVertex jobVertex, int 
parallelism) {
+        // update the JSON Plan, it's needed to enable REST APIs to return the 
latest parallelism of
+        // job
+        // vertices
+        jobVertex.getJobVertex().setParallelism(parallelism);
+        try {
+            
getExecutionGraph().setJsonPlan(JsonPlanGenerator.generatePlan(getJobGraph()));
+        } catch (Throwable t) {
+            log.warn("Cannot create JSON plan for job", t);
+            // give the graph an empty plan
+            getExecutionGraph().setJsonPlan("{}");
+        }
+
+        jobVertex.setParallelism(parallelism);
+    }
+
+    /** Get information of consumable results. */
+    private Optional<List<BlockingResultInfo>> tryGetConsumedResultsInfo(
+            final ExecutionJobVertex jobVertex) {
+
+        List<BlockingResultInfo> consumableResultInfo = new ArrayList<>();
+
+        DefaultLogicalVertex logicalVertex = 
logicalTopology.getVertex(jobVertex.getJobVertexId());
+        Iterable<DefaultLogicalResult> consumedResults = 
logicalVertex.getConsumedResults();
+
+        for (DefaultLogicalResult consumedResult : consumedResults) {
+            final ExecutionJobVertex producerVertex =
+                    
getExecutionJobVertex(consumedResult.getProducer().getId());
+            if (producerVertex.isFinished()) {
+                IntermediateResult intermediateResult =
+                        
getExecutionGraph().getAllIntermediateResults().get(consumedResult.getId());
+                checkNotNull(intermediateResult);
+
+                consumableResultInfo.add(
+                        
BlockingResultInfo.createFromIntermediateResult(intermediateResult));
+            } else {
+                // not all inputs consumable, return Optional.empty()
+                return Optional.empty();
+            }
+        }
+
+        return Optional.of(consumableResultInfo);
+    }
+
+    private boolean canInitialize(final ExecutionJobVertex jobVertex) {
+        if (jobVertex.isInitialized() || !jobVertex.isParallelismDecided()) {
+            return false;
+        }
+
+        // all the upstream job vertices need to have been initialized
+        for (JobEdge inputEdge : jobVertex.getJobVertex().getInputs()) {
+            final ExecutionJobVertex producerVertex =
+                    
getExecutionGraph().getJobVertex(inputEdge.getSource().getProducer().getID());
+            checkNotNull(producerVertex);
+            if (!producerVertex.isInitialized()) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    private void updateTopology(final List<ExecutionJobVertex> 
newlyInitializedJobVertices) {
+        for (ExecutionJobVertex vertex : newlyInitializedJobVertices) {
+            initializeOperatorCoordinatorsFor(vertex);
+        }
+
+        // notify execution graph updated, and try to update the execution 
topology.
+        
getExecutionGraph().notifyNewlyInitializedJobVertices(newlyInitializedJobVertices);
+    }
+
+    private void initializeOperatorCoordinatorsFor(ExecutionJobVertex vertex) {
+        operatorCoordinatorHandler.registerAndStartNewCoordinators(
+                vertex.getOperatorCoordinators(), getMainThreadExecutor());
+    }
+
+    /**
+     * Compute the {@link VertexParallelismStore} for all given vertices in a 
dynamic graph, which
+     * will set defaults and ensure that the returned store contains valid 
parallelisms, with the
+     * configured default max parallelism.
+     *
+     * @param vertices the vertices to compute parallelism for
+     * @param defaultMaxParallelism the global default max parallelism
+     * @return the computed parallelism store
+     */
+    @VisibleForTesting
+    public static VertexParallelismStore 
computeVertexParallelismStoreForDynamicGraph(
+            Iterable<JobVertex> vertices, int defaultMaxParallelism) {
+        // for dynamic graph, there is no need to normalize vertex 
parallelism. if the max
+        // parallelism is not configured and the parallelism is a positive 
value, max
+        // parallelism can be computed against the parallelism, otherwise it 
needs to use the
+        // global default max parallelism.
+        return computeVertexParallelismStore(
+                vertices,
+                v -> {
+                    if (v.getParallelism() > 0) {
+                        return getDefaultMaxParallelism(v);
+                    } else {
+                        return defaultMaxParallelism;
+                    }
+                },
+                Function.identity());
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
new file mode 100644
index 0000000..541c56e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.BatchShuffleMode;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
+import 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
+import 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+import 
org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocatorFactory;
+import 
org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.util.SlotSelectionStrategyUtils;
+import org.apache.flink.util.clock.SystemClock;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Factory for {@link AdaptiveBatchScheduler}. */
+public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory {
+
+    @Override
+    public SchedulerNG createInstance(
+            Logger log,
+            JobGraph jobGraph,
+            Executor ioExecutor,
+            Configuration jobMasterConfiguration,
+            SlotPoolService slotPoolService,
+            ScheduledExecutorService futureExecutor,
+            ClassLoader userCodeLoader,
+            CheckpointRecoveryFactory checkpointRecoveryFactory,
+            Time rpcTimeout,
+            BlobWriter blobWriter,
+            JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            Time slotRequestTimeout,
+            ShuffleMaster<?> shuffleMaster,
+            JobMasterPartitionTracker partitionTracker,
+            ExecutionDeploymentTracker executionDeploymentTracker,
+            long initializationTimestamp,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            FatalErrorHandler fatalErrorHandler,
+            JobStatusListener jobStatusListener)
+            throws Exception {
+
+        checkState(
+                jobGraph.getJobType() == JobType.BATCH,
+                "Adaptive batch scheduler only supports batch jobs");
+        checkAllExchangesBlocking(jobGraph);
+
+        final SlotPool slotPool =
+                slotPoolService
+                        .castInto(SlotPool.class)
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "The DefaultScheduler requires 
a SlotPool."));
+
+        final SlotSelectionStrategy slotSelectionStrategy =
+                SlotSelectionStrategyUtils.selectSlotSelectionStrategy(
+                        JobType.BATCH, jobMasterConfiguration);
+        final PhysicalSlotRequestBulkChecker bulkChecker =
+                PhysicalSlotRequestBulkCheckerImpl.createFromSlotPool(
+                        slotPool, SystemClock.getInstance());
+        final PhysicalSlotProvider physicalSlotProvider =
+                new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
+        final ExecutionSlotAllocatorFactory allocatorFactory =
+                new SlotSharingExecutionSlotAllocatorFactory(
+                        physicalSlotProvider, false, bulkChecker, 
slotRequestTimeout);
+
+        final RestartBackoffTimeStrategy restartBackoffTimeStrategy =
+                
RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(
+                                jobGraph.getSerializedExecutionConfig()
+                                        .deserializeValue(userCodeLoader)
+                                        .getRestartStrategy(),
+                                jobMasterConfiguration,
+                                jobGraph.isCheckpointingEnabled())
+                        .create();
+        log.info(
+                "Using restart back off time strategy {} for {} ({}).",
+                restartBackoffTimeStrategy,
+                jobGraph.getName(),
+                jobGraph.getJobID());
+
+        final ExecutionGraphFactory executionGraphFactory =
+                new DefaultExecutionGraphFactory(
+                        jobMasterConfiguration,
+                        userCodeLoader,
+                        executionDeploymentTracker,
+                        futureExecutor,
+                        ioExecutor,
+                        rpcTimeout,
+                        jobManagerJobMetricGroup,
+                        blobWriter,
+                        shuffleMaster,
+                        partitionTracker,
+                        true);
+
+        return new AdaptiveBatchScheduler(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                bulkChecker::start,
+                new ScheduledExecutorServiceAdapter(futureExecutor),
+                userCodeLoader,
+                new CheckpointsCleaner(),
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                new VertexwiseSchedulingStrategy.Factory(),
+                
FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
+                restartBackoffTimeStrategy,
+                new DefaultExecutionVertexOperations(),
+                new ExecutionVertexVersioner(),
+                allocatorFactory,
+                initializationTimestamp,
+                mainThreadExecutor,
+                jobStatusListener,
+                executionGraphFactory,
+                shuffleMaster,
+                rpcTimeout,
+                DefaultVertexParallelismDecider.from(jobMasterConfiguration),
+                jobMasterConfiguration.getInteger(
+                        
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM));
+    }
+
+    private void checkAllExchangesBlocking(final JobGraph jobGraph) {
+        for (JobVertex jobVertex : jobGraph.getVertices()) {
+            for (IntermediateDataSet dataSet : 
jobVertex.getProducedDataSets()) {
+                checkState(
+                        dataSet.getResultType().isBlocking(),
+                        String.format(
+                                "At the moment, adaptive batch scheduler 
requires batch workloads "
+                                        + "to be executed with types of all 
edges being BLOCKING. "
+                                        + "To do that, you need to configure 
'%s' to '%s'.",
+                                ExecutionOptions.BATCH_SHUFFLE_MODE.key(),
+                                BatchShuffleMode.ALL_EXCHANGES_BLOCKING));
+            }
+        }
+    }
+
+    @Override
+    public JobManagerOptions.SchedulerType getSchedulerType() {
+        return JobManagerOptions.SchedulerType.AdaptiveBatch;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
index 1248fd1..11a45a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
@@ -18,8 +18,18 @@
 
 package org.apache.flink.runtime.scheduler.adaptivebatch;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+
+import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
 /** The blocking result info, which will be used to calculate the vertex 
parallelism. */
 public class BlockingResultInfo {
 
@@ -40,12 +50,32 @@ public class BlockingResultInfo {
         return isBroadcast;
     }
 
-    public static BlockingResultInfo createFromBroadcastResult(List<Long> 
blockingPartitionSizes) {
+    @VisibleForTesting
+    static BlockingResultInfo createFromBroadcastResult(List<Long> 
blockingPartitionSizes) {
         return new BlockingResultInfo(blockingPartitionSizes, true);
     }
 
-    public static BlockingResultInfo createFromNonBroadcastResult(
-            List<Long> blockingPartitionSizes) {
+    @VisibleForTesting
+    static BlockingResultInfo createFromNonBroadcastResult(List<Long> 
blockingPartitionSizes) {
         return new BlockingResultInfo(blockingPartitionSizes, false);
     }
+
+    public static BlockingResultInfo createFromIntermediateResult(
+            IntermediateResult intermediateResult) {
+        checkArgument(intermediateResult != null);
+
+        List<Long> blockingPartitionSizes = new ArrayList<>();
+        for (IntermediateResultPartition partition : 
intermediateResult.getPartitions()) {
+            checkState(partition.isConsumable());
+
+            IOMetrics ioMetrics =
+                    
partition.getProducer().getCurrentExecutionAttempt().getIOMetrics();
+            checkNotNull(ioMetrics, "IOMetrics should not be null.");
+
+            blockingPartitionSizes.add(
+                    
ioMetrics.getNumBytesProducedOfPartitions().get(partition.getPartitionId()));
+        }
+
+        return new BlockingResultInfo(blockingPartitionSizes, 
intermediateResult.isBroadcast());
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroup.java
new file mode 100644
index 0000000..fa5dffc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroup.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A forward group is a set of job vertices connected via forward edges. 
Parallelisms of all job
+ * vertices in the same {@link ForwardGroup} must be the same.
+ */
+public class ForwardGroup {
+
+    private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+
+    private final Set<JobVertexID> jobVertexIds = new HashSet<>();
+
+    public ForwardGroup(final Set<ExecutionJobVertex> jobVertices) {
+        checkNotNull(jobVertices);
+
+        Set<Integer> decidedParallelisms =
+                jobVertices.stream()
+                        .filter(
+                                jobVertex -> {
+                                    
jobVertexIds.add(jobVertex.getJobVertexId());
+                                    return jobVertex.isParallelismDecided();
+                                })
+                        .map(ExecutionJobVertex::getParallelism)
+                        .collect(Collectors.toSet());
+
+        checkState(decidedParallelisms.size() <= 1);
+        if (decidedParallelisms.size() == 1) {
+            this.parallelism = decidedParallelisms.iterator().next();
+        }
+    }
+
+    public void setParallelism(int parallelism) {
+        checkState(this.parallelism == ExecutionConfig.PARALLELISM_DEFAULT);
+        this.parallelism = parallelism;
+    }
+
+    public boolean isParallelismDecided() {
+        return parallelism > 0;
+    }
+
+    public int getParallelism() {
+        checkState(isParallelismDecided());
+        return parallelism;
+    }
+
+    public int size() {
+        return jobVertexIds.size();
+    }
+
+    public Set<JobVertexID> getJobVertexIds() {
+        return jobVertexIds;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtil.java
new file mode 100644
index 0000000..28491cf
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtil.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup;
+
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Common utils for computing forward groups. */
+public class ForwardGroupComputeUtil {
+
+    public static Map<JobVertexID, ForwardGroup> computeForwardGroups(
+            final Iterable<JobVertex> topologicallySortedVertices,
+            Function<JobVertexID, ExecutionJobVertex> 
executionJobVertexRetriever) {
+
+        final Map<JobVertex, Set<JobVertex>> vertexToGroup = new 
IdentityHashMap<>();
+
+        // iterate all the vertices which are topologically sorted
+        for (JobVertex vertex : topologicallySortedVertices) {
+            Set<JobVertex> currentGroup = new HashSet<>();
+            currentGroup.add(vertex);
+            vertexToGroup.put(vertex, currentGroup);
+
+            for (JobEdge input : getForwardInputs(vertex)) {
+                final JobVertex producerVertex = 
input.getSource().getProducer();
+                final Set<JobVertex> producerGroup = 
vertexToGroup.get(producerVertex);
+
+                if (producerGroup == null) {
+                    throw new IllegalStateException(
+                            "Producer task "
+                                    + producerVertex.getID()
+                                    + " forward group is null"
+                                    + " while calculating forward group for 
the consumer task "
+                                    + vertex.getID()
+                                    + ". This should be a forward group 
building bug.");
+                }
+
+                if (currentGroup != producerGroup) {
+                    currentGroup =
+                            VertexGroupComputeUtil.mergeVertexGroups(
+                                    currentGroup, producerGroup, 
vertexToGroup);
+                }
+            }
+        }
+
+        final Map<JobVertexID, ForwardGroup> ret = new HashMap<>();
+        for (Set<JobVertex> vertexGroup :
+                VertexGroupComputeUtil.uniqueVertexGroups(vertexToGroup)) {
+            if (vertexGroup.size() > 1) {
+                ForwardGroup forwardGroup =
+                        new ForwardGroup(
+                                vertexGroup.stream()
+                                        .map(
+                                                vertex ->
+                                                        
executionJobVertexRetriever.apply(
+                                                                
vertex.getID()))
+                                        .collect(Collectors.toSet()));
+                for (JobVertexID jobVertexId : forwardGroup.getJobVertexIds()) 
{
+                    ret.put(jobVertexId, forwardGroup);
+                }
+            }
+        }
+        return ret;
+    }
+
+    static Iterable<JobEdge> getForwardInputs(JobVertex jobVertex) {
+        return jobVertex.getInputs().stream()
+                .filter(JobEdge::isForward)
+                .collect(Collectors.toSet());
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtils.java
new file mode 100644
index 0000000..3eb7a9f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobType;
+import 
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
+import 
org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility class for selecting {@link SlotSelectionStrategy}. */
+public class SlotSelectionStrategyUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SlotSelectionStrategyUtils.class);
+
+    public static SlotSelectionStrategy selectSlotSelectionStrategy(
+            final JobType jobType, final Configuration configuration) {
+        final boolean evenlySpreadOutSlots =
+                
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
+
+        final SlotSelectionStrategy locationPreferenceSlotSelectionStrategy;
+
+        locationPreferenceSlotSelectionStrategy =
+                evenlySpreadOutSlots
+                        ? 
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut()
+                        : 
LocationPreferenceSlotSelectionStrategy.createDefault();
+
+        final boolean isLocalRecoveryEnabled =
+                configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY);
+        if (isLocalRecoveryEnabled) {
+            if (jobType == JobType.STREAMING) {
+                return PreviousAllocationSlotSelectionStrategy.create(
+                        locationPreferenceSlotSelectionStrategy);
+            } else {
+                LOG.warn(
+                        "Batch job does not support local recovery. Falling 
back to use "
+                                + 
locationPreferenceSlotSelectionStrategy.getClass());
+                return locationPreferenceSlotSelectionStrategy;
+            }
+        } else {
+            return locationPreferenceSlotSelectionStrategy;
+        }
+    }
+
+    /** Private default constructor to avoid being instantiated. */
+    private SlotSelectionStrategyUtils() {}
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
index 240427b..3ec1bb0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
@@ -24,15 +24,14 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
 import org.apache.flink.runtime.scheduler.VertexParallelismStore;
+import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Collections;
-import java.util.function.Function;
 
 import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
 import static org.hamcrest.CoreMatchers.is;
@@ -210,38 +209,11 @@ public class ExecutionJobVertexTest {
 
         final DefaultExecutionGraph eg = 
TestingDefaultExecutionGraphBuilder.newBuilder().build();
         final VertexParallelismStore vertexParallelismStore =
-                computeVertexParallelismStoreForDynamicGraph(
+                
AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(
                         Collections.singletonList(jobVertex), 
defaultMaxParallelism);
         final VertexParallelismInformation vertexParallelismInfo =
                 vertexParallelismStore.getParallelismInfo(jobVertex.getID());
 
         return new ExecutionJobVertex(eg, jobVertex, vertexParallelismInfo);
     }
-
-    /**
-     * Compute the {@link VertexParallelismStore} for all given vertices in a 
dynamic graph, which
-     * will set defaults and ensure that the returned store contains valid 
parallelisms, with the
-     * configured default max parallelism.
-     *
-     * @param vertices the vertices to compute parallelism for
-     * @param defaultMaxParallelism the global default max parallelism
-     * @return the computed parallelism store
-     */
-    public static VertexParallelismStore 
computeVertexParallelismStoreForDynamicGraph(
-            Iterable<JobVertex> vertices, int defaultMaxParallelism) {
-        // for dynamic graph, there is no need to normalize vertex 
parallelism. if the max
-        // parallelism is not configured and the parallelism is a positive 
value, max
-        // parallelism can be computed against the parallelism, otherwise it 
needs to use the
-        // global default max parallelism.
-        return SchedulerBase.computeVertexParallelismStore(
-                vertices,
-                v -> {
-                    if (v.getParallelism() > 0) {
-                        return SchedulerBase.getDefaultMaxParallelism(v);
-                    } else {
-                        return defaultMaxParallelism;
-                    }
-                },
-                Function.identity());
-    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
index 611e18a..56be6d2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.VertexParallelismStore;
+import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
 import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
@@ -270,7 +271,7 @@ public class IntermediateResultPartitionTest extends 
TestLogger {
     public static VertexParallelismStore 
computeVertexParallelismStoreConsideringDynamicGraph(
             Iterable<JobVertex> vertices, boolean isDynamicGraph, int 
defaultMaxParallelism) {
         if (isDynamicGraph) {
-            return 
ExecutionJobVertexTest.computeVertexParallelismStoreForDynamicGraph(
+            return 
AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(
                     vertices, defaultMaxParallelism);
         } else {
             return SchedulerBase.computeVertexParallelismStore(vertices);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
index 85e225b..f459c54 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
@@ -20,14 +20,10 @@
 package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobType;
-import 
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
-import 
org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
 import 
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
 import org.apache.flink.util.TestLogger;
 
@@ -81,32 +77,6 @@ public class DefaultSchedulerComponentsFactoryTest extends 
TestLogger {
         }
     }
 
-    @Test
-    public void 
testCreatePreviousAllocationSlotSelectionStrategyForLocalRecoveryStreamingJob() 
{
-        final Configuration configuration = new Configuration();
-        configuration.set(CheckpointingOptions.LOCAL_RECOVERY, true);
-
-        final SlotSelectionStrategy slotSelectionStrategy =
-                DefaultSchedulerComponents.selectSlotSelectionStrategy(
-                        JobType.STREAMING, configuration);
-
-        assertThat(
-                slotSelectionStrategy, 
instanceOf(PreviousAllocationSlotSelectionStrategy.class));
-    }
-
-    @Test
-    public void 
testCreateLocationPreferenceSlotSelectionStrategyForLocalRecoveryBatchJob() {
-        final Configuration configuration = new Configuration();
-        configuration.set(CheckpointingOptions.LOCAL_RECOVERY, true);
-
-        final SlotSelectionStrategy slotSelectionStrategy =
-                DefaultSchedulerComponents.selectSlotSelectionStrategy(
-                        JobType.BATCH, configuration);
-
-        assertThat(
-                slotSelectionStrategy, 
instanceOf(LocationPreferenceSlotSelectionStrategy.class));
-    }
-
     private static DefaultSchedulerComponents createSchedulerComponents(
             final Configuration configuration) {
         return createSchedulerComponents(configuration, false, JobType.BATCH);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index d214d80..aeb8bf0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -387,39 +387,41 @@ public class SchedulerTestingUtils {
 
     /** Builder for {@link DefaultScheduler}. */
     public static class DefaultSchedulerBuilder {
-        private final JobGraph jobGraph;
+        protected final JobGraph jobGraph;
 
-        private final ComponentMainThreadExecutor mainThreadExecutor;
+        protected final ComponentMainThreadExecutor mainThreadExecutor;
 
-        private SchedulingStrategyFactory schedulingStrategyFactory =
+        protected SchedulingStrategyFactory schedulingStrategyFactory =
                 new PipelinedRegionSchedulingStrategy.Factory();
 
-        private Logger log = LOG;
-        private Executor ioExecutor = TestingUtils.defaultExecutor();
-        private Configuration jobMasterConfiguration = new Configuration();
-        private ScheduledExecutorService futureExecutor = 
TestingUtils.defaultExecutor();
-        private ScheduledExecutor delayExecutor =
+        protected Logger log = LOG;
+        protected Executor ioExecutor = TestingUtils.defaultExecutor();
+        protected Configuration jobMasterConfiguration = new Configuration();
+        protected ScheduledExecutorService futureExecutor = 
TestingUtils.defaultExecutor();
+        protected ScheduledExecutor delayExecutor =
                 new ScheduledExecutorServiceAdapter(futureExecutor);
-        private ClassLoader userCodeLoader = 
ClassLoader.getSystemClassLoader();
-        private CheckpointsCleaner checkpointCleaner = new 
CheckpointsCleaner();
-        private CheckpointRecoveryFactory checkpointRecoveryFactory =
+        protected ClassLoader userCodeLoader = 
ClassLoader.getSystemClassLoader();
+        protected CheckpointsCleaner checkpointCleaner = new 
CheckpointsCleaner();
+        protected CheckpointRecoveryFactory checkpointRecoveryFactory =
                 new StandaloneCheckpointRecoveryFactory();
-        private Time rpcTimeout = DEFAULT_TIMEOUT;
-        private BlobWriter blobWriter = VoidBlobWriter.getInstance();
-        private JobManagerJobMetricGroup jobManagerJobMetricGroup =
+        protected Time rpcTimeout = DEFAULT_TIMEOUT;
+        protected BlobWriter blobWriter = VoidBlobWriter.getInstance();
+        protected JobManagerJobMetricGroup jobManagerJobMetricGroup =
                 
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
-        private ShuffleMaster<?> shuffleMaster = 
ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER;
-        private JobMasterPartitionTracker partitionTracker = 
NoOpJobMasterPartitionTracker.INSTANCE;
-        private FailoverStrategy.Factory failoverStrategyFactory =
+        protected ShuffleMaster<?> shuffleMaster = 
ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER;
+        protected JobMasterPartitionTracker partitionTracker =
+                NoOpJobMasterPartitionTracker.INSTANCE;
+        protected FailoverStrategy.Factory failoverStrategyFactory =
                 new RestartPipelinedRegionFailoverStrategy.Factory();
-        private RestartBackoffTimeStrategy restartBackoffTimeStrategy =
+        protected RestartBackoffTimeStrategy restartBackoffTimeStrategy =
                 NoRestartBackoffTimeStrategy.INSTANCE;
-        private ExecutionVertexOperations executionVertexOperations =
+        protected ExecutionVertexOperations executionVertexOperations =
                 new DefaultExecutionVertexOperations();
-        private ExecutionVertexVersioner executionVertexVersioner = new 
ExecutionVertexVersioner();
-        private ExecutionSlotAllocatorFactory executionSlotAllocatorFactory =
+        protected ExecutionVertexVersioner executionVertexVersioner =
+                new ExecutionVertexVersioner();
+        protected ExecutionSlotAllocatorFactory executionSlotAllocatorFactory =
                 new TestExecutionSlotAllocatorFactory();
-        private JobStatusListener jobStatusListener = (ignoredA, ignoredB, 
ignoredC) -> {};
+        protected JobStatusListener jobStatusListener = (ignoredA, ignoredB, 
ignoredC) -> {};
 
         public DefaultSchedulerBuilder(
                 final JobGraph jobGraph, ComponentMainThreadExecutor 
mainThreadExecutor) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
index 9481c82..2c1bf64 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertexTest;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartitionTest;
@@ -35,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ProducerDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
@@ -208,7 +208,7 @@ public class SsgNetworkMemoryCalculationUtilsTest {
         JobGraph jobGraph = createBatchGraph(slotSharingGroups, 
Arrays.asList(4, -1, -1));
 
         final VertexParallelismStore vertexParallelismStore =
-                
ExecutionJobVertexTest.computeVertexParallelismStoreForDynamicGraph(
+                
AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(
                         jobGraph.getVertices(), defaultMaxParallelism);
 
         return TestingDefaultExecutionGraphBuilder.newBuilder()
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
index 26f2933..1eda992 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
@@ -365,4 +365,9 @@ class StateTrackingMockExecutionGraph implements 
ExecutionGraph {
             throws JobException {
         throw new UnsupportedOperationException();
     }
+
+    @Override
+    public void notifyNewlyInitializedJobVertices(List<ExecutionJobVertex> 
vertices) {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
new file mode 100644
index 0000000..ab8a8e9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+/** Test for {@link AdaptiveBatchScheduler}. */
+public class AdaptiveBatchSchedulerTest extends TestLogger {
+
+    private static final int SOURCE_PARALLELISM_1 = 6;
+    private static final int SOURCE_PARALLELISM_2 = 4;
+
+    private static final ComponentMainThreadExecutor mainThreadExecutor =
+            ComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+    @Test
+    public void testAdaptiveBatchScheduler() throws Exception {
+        JobGraph jobGraph = createJobGraph(false);
+        Iterator<JobVertex> jobVertexIterator = 
jobGraph.getVertices().iterator();
+        JobVertex source1 = jobVertexIterator.next();
+        JobVertex source2 = jobVertexIterator.next();
+        JobVertex sink = jobVertexIterator.next();
+
+        SchedulerBase scheduler = createScheduler(jobGraph);
+
+        final DefaultExecutionGraph graph = (DefaultExecutionGraph) 
scheduler.getExecutionGraph();
+        final ExecutionJobVertex sinkExecutionJobVertex = 
graph.getJobVertex(sink.getID());
+
+        scheduler.startScheduling();
+        assertThat(sinkExecutionJobVertex.getParallelism(), is(-1));
+
+        // trigger source1 finished.
+        transitionExecutionsState(scheduler, ExecutionState.FINISHED, source1);
+        assertThat(sinkExecutionJobVertex.getParallelism(), is(-1));
+
+        // trigger source2 finished.
+        transitionExecutionsState(scheduler, ExecutionState.FINISHED, source2);
+        assertThat(sinkExecutionJobVertex.getParallelism(), is(10));
+
+        // check that the jobGraph is updated
+        assertThat(sink.getParallelism(), is(10));
+    }
+
+    @Test
+    public void testDecideParallelismForForwardTarget() throws Exception {
+        JobGraph jobGraph = createJobGraph(true);
+        Iterator<JobVertex> jobVertexIterator = 
jobGraph.getVertices().iterator();
+        JobVertex source1 = jobVertexIterator.next();
+        JobVertex source2 = jobVertexIterator.next();
+        JobVertex sink = jobVertexIterator.next();
+
+        SchedulerBase scheduler = createScheduler(jobGraph);
+
+        final DefaultExecutionGraph graph = (DefaultExecutionGraph) 
scheduler.getExecutionGraph();
+        final ExecutionJobVertex sinkExecutionJobVertex = 
graph.getJobVertex(sink.getID());
+
+        scheduler.startScheduling();
+        assertThat(sinkExecutionJobVertex.getParallelism(), is(-1));
+
+        // trigger source1 finished.
+        transitionExecutionsState(scheduler, ExecutionState.FINISHED, source1);
+        assertThat(sinkExecutionJobVertex.getParallelism(), is(-1));
+
+        // trigger source2 finished.
+        transitionExecutionsState(scheduler, ExecutionState.FINISHED, source2);
+        assertThat(sinkExecutionJobVertex.getParallelism(), 
is(SOURCE_PARALLELISM_1));
+
+        // check that the jobGraph is updated
+        assertThat(sink.getParallelism(), is(SOURCE_PARALLELISM_1));
+    }
+
+    /** Transit the state of all executions. */
+    public static void transitionExecutionsState(
+            final SchedulerBase scheduler, final ExecutionState state, 
List<Execution> executions) {
+        for (Execution execution : executions) {
+            scheduler.updateTaskExecutionState(
+                    new TaskExecutionState(
+                            execution.getAttemptId(),
+                            state,
+                            null,
+                            null,
+                            new IOMetrics(0, 0, 0, 0)));
+        }
+    }
+
+    /** Transit the state of all executions in the Job Vertex. */
+    public static void transitionExecutionsState(
+            final SchedulerBase scheduler, final ExecutionState state, final 
JobVertex jobVertex) {
+        final ExecutionGraph executionGraph = scheduler.getExecutionGraph();
+        List<Execution> executions =
+                
Arrays.asList(executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices())
+                        .stream()
+                        .map(ExecutionVertex::getCurrentExecutionAttempt)
+                        .collect(Collectors.toList());
+        transitionExecutionsState(scheduler, state, executions);
+    }
+
+    public JobVertex createJobVertex(String jobVertexName, int parallelism) {
+        final JobVertex jobVertex = new JobVertex(jobVertexName);
+        jobVertex.setInvokableClass(NoOpInvokable.class);
+        if (parallelism > 0) {
+            jobVertex.setParallelism(parallelism);
+        }
+        return jobVertex;
+    }
+
+    public JobGraph createJobGraph(boolean withForwardEdge) {
+        final JobVertex source1 = createJobVertex("source1", 
SOURCE_PARALLELISM_1);
+        final JobVertex source2 = createJobVertex("source2", 
SOURCE_PARALLELISM_2);
+        final JobVertex sink = createJobVertex("sink", -1);
+        sink.connectNewDataSetAsInput(
+                source1, DistributionPattern.POINTWISE, 
ResultPartitionType.BLOCKING);
+        sink.connectNewDataSetAsInput(
+                source2, DistributionPattern.POINTWISE, 
ResultPartitionType.BLOCKING);
+        if (withForwardEdge) {
+            
source1.getProducedDataSets().get(0).getConsumer().setForward(true);
+        }
+        return new JobGraph(new JobID(), "test job", source1, source2, sink);
+    }
+
+    public SchedulerBase createScheduler(JobGraph jobGraph) throws Exception {
+        Configuration configuration = new Configuration();
+        configuration.set(
+                JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.AdaptiveBatch);
+
+        final AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder 
schedulerBuilder =
+                (AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder)
+                        new 
AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder(
+                                        jobGraph, mainThreadExecutor)
+                                .setJobMasterConfiguration(configuration);
+        schedulerBuilder.setJobVertexParallelismDecider((ignored) -> 10);
+
+        return schedulerBuilder.build();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
new file mode 100644
index 0000000..116cb24
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
+import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import 
org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
+
+/** A utility class to create {@link AdaptiveBatchScheduler} instances for 
testing. */
+public class AdaptiveBatchSchedulerTestUtils {
+
+    /** Builder for {@link AdaptiveBatchScheduler}. */
+    public static class AdaptiveBatchSchedulerBuilder
+            extends SchedulerTestingUtils.DefaultSchedulerBuilder {
+
+        private VertexParallelismDecider vertexParallelismDecider = (ignored) 
-> 0;
+
+        private int defaultMaxParallelism =
+                
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.defaultValue();
+
+        public AdaptiveBatchSchedulerBuilder(
+                JobGraph jobGraph, ComponentMainThreadExecutor 
mainThreadExecutor) {
+            super(jobGraph, mainThreadExecutor);
+            setSchedulingStrategyFactory(new 
VertexwiseSchedulingStrategy.Factory());
+        }
+
+        public void setJobVertexParallelismDecider(
+                VertexParallelismDecider jobVertexParallelismDecider) {
+            this.vertexParallelismDecider = jobVertexParallelismDecider;
+        }
+
+        public void setDefaultMaxParallelism(int defaultMaxParallelism) {
+            this.defaultMaxParallelism = defaultMaxParallelism;
+        }
+
+        @Override
+        public AdaptiveBatchScheduler build() throws Exception {
+            final ExecutionGraphFactory executionGraphFactory =
+                    new DefaultExecutionGraphFactory(
+                            jobMasterConfiguration,
+                            userCodeLoader,
+                            new DefaultExecutionDeploymentTracker(),
+                            futureExecutor,
+                            ioExecutor,
+                            rpcTimeout,
+                            jobManagerJobMetricGroup,
+                            blobWriter,
+                            shuffleMaster,
+                            partitionTracker,
+                            true);
+
+            return new AdaptiveBatchScheduler(
+                    log,
+                    jobGraph,
+                    ioExecutor,
+                    jobMasterConfiguration,
+                    componentMainThreadExecutor -> {},
+                    delayExecutor,
+                    userCodeLoader,
+                    checkpointCleaner,
+                    checkpointRecoveryFactory,
+                    jobManagerJobMetricGroup,
+                    schedulingStrategyFactory,
+                    failoverStrategyFactory,
+                    restartBackoffTimeStrategy,
+                    executionVertexOperations,
+                    executionVertexVersioner,
+                    executionSlotAllocatorFactory,
+                    System.currentTimeMillis(),
+                    mainThreadExecutor,
+                    jobStatusListener,
+                    executionGraphFactory,
+                    shuffleMaster,
+                    rpcTimeout,
+                    vertexParallelismDecider,
+                    defaultMaxParallelism);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtilTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtilTest.java
new file mode 100644
index 0000000..b0d19f1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtilTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import 
org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+
+/** Unit tests for {@link ForwardGroupComputeUtil}. */
+public class ForwardGroupComputeUtilTest extends TestLogger {
+    /**
+     * Tests that the computation of the job graph with isolated vertices 
works correctly.
+     *
+     * <pre>
+     *     (v1)
+     *
+     *     (v2)
+     *
+     *     (v3)
+     * </pre>
+     */
+    @Test
+    public void testIsolatedVertices() throws Exception {
+        JobVertex v1 = new JobVertex("v1");
+        JobVertex v2 = new JobVertex("v2");
+        JobVertex v3 = new JobVertex("v3");
+
+        Set<ForwardGroup> groups = computeForwardGroups(v1, v2, v3);
+
+        checkGroupSize(groups, 0);
+    }
+
+    /**
+     * Tests that the computation of the vertices connected with edges which 
have various result
+     * partition types works correctly.
+     *
+     * <pre>
+     *
+     *     (v1) -> (v2) -> (v3)
+     *
+     * </pre>
+     */
+    @Test
+    public void testVariousResultPartitionTypesBetweenVertices() throws 
Exception {
+        testThreeVerticesConnectSequentially(false, true, 1, 2);
+        testThreeVerticesConnectSequentially(false, false, 0);
+        testThreeVerticesConnectSequentially(true, true, 1, 3);
+    }
+
+    private void testThreeVerticesConnectSequentially(
+            boolean isForward1, boolean isForward2, int numOfGroups, int... 
groupSizes)
+            throws Exception {
+        JobVertex v1 = new JobVertex("v1");
+        JobVertex v2 = new JobVertex("v2");
+        JobVertex v3 = new JobVertex("v3");
+
+        v2.connectNewDataSetAsInput(
+                v1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+        if (isForward1) {
+            v1.getProducedDataSets().get(0).getConsumer().setForward(true);
+        }
+
+        v3.connectNewDataSetAsInput(
+                v2, DistributionPattern.POINTWISE, 
ResultPartitionType.BLOCKING);
+
+        if (isForward2) {
+            v2.getProducedDataSets().get(0).getConsumer().setForward(true);
+        }
+
+        Set<ForwardGroup> groups = computeForwardGroups(v1, v2, v3);
+
+        checkGroupSize(groups, numOfGroups, groupSizes);
+    }
+
+    /**
+     * Tests that the computation of the job graph where two upstream vertices 
connect with one
+     * downstream vertex works correctly.
+     *
+     * <pre>
+     *
+     *     (v1) --
+     *           |
+     *           --> (v3) -> (v4)
+     *           |
+     *     (v2) --
+     *
+     * </pre>
+     */
+    @Test
+    public void testTwoInputsMergesIntoOne() throws Exception {
+        JobVertex v1 = new JobVertex("v1");
+        JobVertex v2 = new JobVertex("v2");
+        JobVertex v3 = new JobVertex("v3");
+        JobVertex v4 = new JobVertex("v4");
+
+        v3.connectNewDataSetAsInput(
+                v1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+        v1.getProducedDataSets().get(0).getConsumer().setForward(true);
+        v3.connectNewDataSetAsInput(
+                v2, DistributionPattern.POINTWISE, 
ResultPartitionType.BLOCKING);
+        v2.getProducedDataSets().get(0).getConsumer().setForward(true);
+        v4.connectNewDataSetAsInput(
+                v3, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+
+        Set<ForwardGroup> groups = computeForwardGroups(v1, v2, v3, v4);
+
+        checkGroupSize(groups, 1, 3);
+    }
+
+    /**
+     * Tests that the computation of the job graph where one upstream vertex 
connect with two
+     * downstream vertices works correctly.
+     *
+     * <pre>
+     *
+     *                    --> (v3)
+     *                    |
+     *      (v1) -> (v2) --
+     *                    |
+     *                    --> (v4)
+     *
+     * </pre>
+     */
+    @Test
+    public void testOneInputSplitsIntoTwo() throws Exception {
+        JobVertex v1 = new JobVertex("v1");
+        JobVertex v2 = new JobVertex("v2");
+        JobVertex v3 = new JobVertex("v3");
+        JobVertex v4 = new JobVertex("v4");
+
+        v2.connectNewDataSetAsInput(
+                v1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+        v3.connectNewDataSetAsInput(
+                v2, DistributionPattern.POINTWISE, 
ResultPartitionType.BLOCKING);
+        v4.connectNewDataSetAsInput(
+                v2, DistributionPattern.POINTWISE, 
ResultPartitionType.BLOCKING);
+        v2.getProducedDataSets().get(0).getConsumer().setForward(true);
+        v2.getProducedDataSets().get(1).getConsumer().setForward(true);
+
+        Set<ForwardGroup> groups = computeForwardGroups(v1, v2, v3, v4);
+
+        checkGroupSize(groups, 1, 3);
+    }
+
+    private static Set<ForwardGroup> computeForwardGroups(JobVertex... 
vertices) throws Exception {
+        Arrays.asList(vertices).forEach(vertex -> 
vertex.setInvokableClass(NoOpInvokable.class));
+        ExecutionGraph executionGraph = createDynamicGraph(vertices);
+        return new HashSet<>(
+                ForwardGroupComputeUtil.computeForwardGroups(
+                                Arrays.asList(vertices), 
executionGraph::getJobVertex)
+                        .values());
+    }
+
+    private static void checkGroupSize(Set<ForwardGroup> groups, int 
numOfGroups, int... sizes) {
+        assertEquals(numOfGroups, groups.size());
+        containsInAnyOrder(
+                
groups.stream().map(ForwardGroup::size).collect(Collectors.toList()), sizes);
+    }
+
+    private static DefaultExecutionGraph createDynamicGraph(JobVertex... 
vertices)
+            throws Exception {
+
+        TestingDefaultExecutionGraphBuilder builder =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(new JobGraph(new JobID(), "TestJob", 
vertices))
+                        .setVertexParallelismStore(
+                                
AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(
+                                        Arrays.asList(vertices), 10));
+        return builder.buildDynamicGraph();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtilsTest.java
new file mode 100644
index 0000000..ba9bdb8
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtilsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobType;
+import 
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
+import 
org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+
+/** Test for {@link SlotSelectionStrategyUtils}. */
+public class SlotSelectionStrategyUtilsTest extends TestLogger {
+
+    @Test
+    public void 
testCreatePreviousAllocationSlotSelectionStrategyForLocalRecoveryStreamingJob() 
{
+        final Configuration configuration = new Configuration();
+        configuration.set(CheckpointingOptions.LOCAL_RECOVERY, true);
+
+        final SlotSelectionStrategy slotSelectionStrategy =
+                SlotSelectionStrategyUtils.selectSlotSelectionStrategy(
+                        JobType.STREAMING, configuration);
+
+        assertThat(
+                slotSelectionStrategy, 
instanceOf(PreviousAllocationSlotSelectionStrategy.class));
+    }
+
+    @Test
+    public void 
testCreateLocationPreferenceSlotSelectionStrategyForLocalRecoveryBatchJob() {
+        final Configuration configuration = new Configuration();
+        configuration.set(CheckpointingOptions.LOCAL_RECOVERY, true);
+
+        final SlotSelectionStrategy slotSelectionStrategy =
+                SlotSelectionStrategyUtils.selectSlotSelectionStrategy(
+                        JobType.BATCH, configuration);
+
+        assertThat(
+                slotSelectionStrategy, 
instanceOf(LocationPreferenceSlotSelectionStrategy.class));
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index af1f27d..af9994d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -943,6 +943,7 @@ public class StreamingJobGraphGenerator {
         // set strategy name so that web interface can show it.
         jobEdge.setShipStrategyName(partitioner.toString());
         jobEdge.setBroadcast(partitioner.isBroadcast());
+        jobEdge.setForward(partitioner instanceof ForwardPartitioner);
         
jobEdge.setDownstreamSubtaskStateMapper(partitioner.getDownstreamSubtaskStateMapper());
         
jobEdge.setUpstreamSubtaskStateMapper(partitioner.getUpstreamSubtaskStateMapper());
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
new file mode 100644
index 0000000..10d5400
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/** IT case for {@link AdaptiveBatchScheduler}. */
+public class AdaptiveBatchSchedulerITCase extends TestLogger {
+
+    private static final int DEFAULT_MAX_PARALLELISM = 4;
+    private static final int SOURCE_PARALLELISM_1 = 2;
+    private static final int SOURCE_PARALLELISM_2 = 8;
+    private static final int NUMBERS_TO_PRODUCE = 10000;
+
+    private static ConcurrentLinkedQueue<Map<Long, Long>> numberCountResults;
+
+    private Map<Long, Long> expectedResult;
+
+    @Before
+    public void setUp() {
+        expectedResult =
+                LongStream.range(0, NUMBERS_TO_PRODUCE)
+                        .boxed()
+                        .collect(Collectors.toMap(Function.identity(), i -> 
2L));
+
+        numberCountResults = new ConcurrentLinkedQueue<>();
+    }
+
+    @Test
+    public void testSchedulingWithUnknownResource() throws Exception {
+        testScheduling(false);
+    }
+
+    @Test
+    public void testSchedulingWithFineGrainedResource() throws Exception {
+        testScheduling(true);
+    }
+
+    public void testScheduling(Boolean isFineGrained) throws Exception {
+        executeJob(isFineGrained);
+
+        Map<Long, Long> numberCountResultMap =
+                numberCountResults.stream()
+                        .flatMap(map -> map.entrySet().stream())
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey,
+                                        Map.Entry::getValue,
+                                        (v1, v2) -> v1 + v2));
+
+        for (int i = 0; i < NUMBERS_TO_PRODUCE; i++) {
+            if (numberCountResultMap.get(i) != expectedResult.get(i)) {
+                System.out.println(i + ": " + numberCountResultMap.get(i));
+            }
+        }
+        assertThat(numberCountResultMap, equalTo(expectedResult));
+    }
+
+    private void executeJob(Boolean isFineGrained) throws Exception {
+        final Configuration configuration = new Configuration();
+        configuration.setString(RestOptions.BIND_PORT, "0");
+        configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L);
+        configuration.set(
+                JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.AdaptiveBatch);
+        configuration.setInteger(
+                JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM,
+                DEFAULT_MAX_PARALLELISM);
+        configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 
MemorySize.parse("4kb"));
+        configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, 1);
+
+        if (isFineGrained) {
+            
configuration.set(ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT, true);
+            
configuration.set(ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING, true);
+        }
+
+        final StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.createLocalEnvironment(configuration);
+        env.setParallelism(-1);
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+        List<SlotSharingGroup> slotSharingGroups = new ArrayList<>();
+
+        for (int i = 0; i < 3; ++i) {
+            SlotSharingGroup group;
+            if (isFineGrained) {
+                group =
+                        SlotSharingGroup.newBuilder("group" + i)
+                                .setCpuCores(1.0)
+                                .setTaskHeapMemory(MemorySize.parse("100m"))
+                                .build();
+            } else {
+                group = SlotSharingGroup.newBuilder("group" + i).build();
+            }
+            slotSharingGroups.add(group);
+        }
+
+        final DataStream<Long> source1 =
+                env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
+                        .setParallelism(SOURCE_PARALLELISM_1)
+                        .name("source1")
+                        .slotSharingGroup(slotSharingGroups.get(0));
+
+        final DataStream<Long> source2 =
+                env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
+                        .setParallelism(SOURCE_PARALLELISM_2)
+                        .name("source2")
+                        .slotSharingGroup(slotSharingGroups.get(1));
+
+        source1.union(source2)
+                .rescale()
+                .map(new NumberCounter())
+                .name("map")
+                .slotSharingGroup(slotSharingGroups.get(2));
+
+        env.execute();
+    }
+
+    private static class NumberCounter extends RichMapFunction<Long, Long> {
+
+        private final Map<Long, Long> numberCountResult = new HashMap<>();
+
+        @Override
+        public Long map(Long value) throws Exception {
+            numberCountResult.put(value, numberCountResult.getOrDefault(value, 
0L) + 1L);
+
+            return value;
+        }
+
+        @Override
+        public void close() throws Exception {
+            numberCountResults.add(numberCountResult);
+        }
+    }
+}

Reply via email to