This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f42a92c [FLINK-25045][runtime] Introduce AdaptiveBatchScheduler
f42a92c is described below
commit f42a92ccfbace9871834dd039a4e2025926b190c
Author: Lijie Wang <[email protected]>
AuthorDate: Fri Jan 28 10:06:05 2022 +0800
[FLINK-25045][runtime] Introduce AdaptiveBatchScheduler
This closes #18462.
---
.../optimizer/plantranslate/JobGraphGenerator.java | 1 +
.../executiongraph/DefaultExecutionGraph.java | 5 +
.../flink/runtime/executiongraph/Execution.java | 8 +-
.../runtime/executiongraph/ExecutionGraph.java | 8 +
.../runtime/executiongraph/ExecutionJobVertex.java | 5 +
.../IntermediateResultPartition.java | 11 +-
.../executiongraph/VertexGroupComputeUtil.java | 57 ++++
.../flip1/LogicalPipelinedRegionComputeUtil.java | 4 +-
.../failover/flip1/PipelinedRegionComputeUtil.java | 33 +--
.../SchedulingPipelinedRegionComputeUtil.java | 9 +-
.../org/apache/flink/runtime/jobgraph/JobEdge.java | 12 +
.../jobgraph/topology/DefaultLogicalTopology.java | 2 +-
.../DefaultSlotPoolServiceSchedulerFactory.java | 11 +
.../DefaultExecutionVertexOperations.java | 3 +-
.../flink/runtime/scheduler/DefaultScheduler.java | 57 +++-
.../scheduler/DefaultSchedulerComponents.java | 43 +--
.../scheduler/ExecutionVertexOperations.java | 2 +-
.../flink/runtime/scheduler/SchedulerBase.java | 2 +-
.../SlotSharingExecutionSlotAllocatorFactory.java | 4 +-
.../adaptivebatch/AdaptiveBatchScheduler.java | 327 +++++++++++++++++++++
.../AdaptiveBatchSchedulerFactory.java | 197 +++++++++++++
.../adaptivebatch/BlockingResultInfo.java | 36 ++-
.../adaptivebatch/forwardgroup/ForwardGroup.java | 83 ++++++
.../forwardgroup/ForwardGroupComputeUtil.java | 98 ++++++
.../runtime/util/SlotSelectionStrategyUtils.java | 69 +++++
.../executiongraph/ExecutionJobVertexTest.java | 32 +-
.../IntermediateResultPartitionTest.java | 3 +-
.../DefaultSchedulerComponentsFactoryTest.java | 30 --
.../runtime/scheduler/SchedulerTestingUtils.java | 46 +--
.../SsgNetworkMemoryCalculationUtilsTest.java | 4 +-
.../adaptive/StateTrackingMockExecutionGraph.java | 5 +
.../adaptivebatch/AdaptiveBatchSchedulerTest.java | 181 ++++++++++++
.../AdaptiveBatchSchedulerTestUtils.java | 101 +++++++
.../forwardgroup/ForwardGroupComputeUtilTest.java | 205 +++++++++++++
.../util/SlotSelectionStrategyUtilsTest.java | 62 ++++
.../api/graph/StreamingJobGraphGenerator.java | 1 +
.../scheduling/AdaptiveBatchSchedulerITCase.java | 177 +++++++++++
37 files changed, 1758 insertions(+), 176 deletions(-)
diff --git
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index b7f9755..0da624f 100644
---
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -1404,6 +1404,7 @@ public class JobGraphGenerator implements
Visitor<PlanNode> {
edge.setShipStrategyName(shipStrategy);
edge.setBroadcast(isBroadcast);
+ edge.setForward(channel.getShipStrategy() == ShipStrategyType.FORWARD);
edge.setPreProcessingOperationName(localStrategy);
edge.setOperatorLevelCachingDescription(caching);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index 77f0e19..363aff9 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -763,6 +763,11 @@ public class DefaultExecutionGraph implements
ExecutionGraph, InternalExecutionG
//
--------------------------------------------------------------------------------------------
@Override
+ public void notifyNewlyInitializedJobVertices(List<ExecutionJobVertex>
vertices) {
+ executionTopology.notifyExecutionGraphUpdated(this, vertices);
+ }
+
+ @Override
public void attachJobGraph(List<JobVertex> topologicallySorted) throws
JobException {
if (isDynamic) {
attachJobGraph(topologicallySorted, Collections.emptyList());
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 463c69f..c363878 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -704,8 +704,12 @@ public class Execution
}
private void updatePartitionConsumers(final IntermediateResultPartition
partition) {
- final ConsumerVertexGroup consumerVertexGroup =
partition.getConsumerVertexGroup();
- for (ExecutionVertexID consumerVertexId : consumerVertexGroup) {
+ final Optional<ConsumerVertexGroup> consumerVertexGroup =
+ partition.getConsumerVertexGroupOptional();
+ if (!consumerVertexGroup.isPresent()) {
+ return;
+ }
+ for (ExecutionVertexID consumerVertexId : consumerVertexGroup.get()) {
final ExecutionVertex consumerVertex =
vertex.getExecutionGraphAccessor().getExecutionVertexOrThrow(consumerVertexId);
final Execution consumer =
consumerVertex.getCurrentExecutionAttempt();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3e6fb2d..b8299cd 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -220,4 +220,12 @@ public interface ExecutionGraph extends
AccessExecutionGraph {
* first Execution with.
*/
void initializeJobVertex(ExecutionJobVertex ejv, long createTimestamp)
throws JobException;
+
+ /**
+ * Notify that some job vertices have been newly initialized, execution
graph will try to update
+ * scheduling topology.
+ *
+ * @param vertices The execution job vertices that are newly initialized.
+ */
+ void notifyNewlyInitializedJobVertices(List<ExecutionJobVertex> vertices);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index a5c1fd7..2dd8cdc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -514,6 +514,11 @@ public class ExecutionJobVertex
numExecutionVertexFinished--;
}
+ public boolean isFinished() {
+ return isParallelismDecided()
+ && numExecutionVertexFinished ==
parallelismInfo.getParallelism();
+ }
+
//
--------------------------------------------------------------------------------------------
// Accumulators / Metrics
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
index 990516f..101339b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -25,8 +25,8 @@ import
org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
import java.util.List;
+import java.util.Optional;
-import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
public class IntermediateResultPartition {
@@ -79,7 +79,14 @@ public class IntermediateResultPartition {
}
public ConsumerVertexGroup getConsumerVertexGroup() {
- return
checkNotNull(getEdgeManager().getConsumerVertexGroupForPartition(partitionId));
+ Optional<ConsumerVertexGroup> consumerVertexGroup =
getConsumerVertexGroupOptional();
+ checkState(consumerVertexGroup.isPresent());
+ return consumerVertexGroup.get();
+ }
+
+ public Optional<ConsumerVertexGroup> getConsumerVertexGroupOptional() {
+ return Optional.ofNullable(
+
getEdgeManager().getConsumerVertexGroupForPartition(partitionId));
}
public List<ConsumedPartitionGroup> getConsumedPartitionGroups() {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexGroupComputeUtil.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexGroupComputeUtil.java
new file mode 100644
index 0000000..dc6f9d9
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexGroupComputeUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.Set;
+
+/** Common utils for processing vertex groups. */
+public final class VertexGroupComputeUtil {
+
+ public static <V> Set<V> mergeVertexGroups(
+ final Set<V> group1, final Set<V> group2, final Map<V, Set<V>>
vertexToGroup) {
+
+ // merge the smaller group into the larger one to reduce the cost
+ final Set<V> smallerSet;
+ final Set<V> largerSet;
+ if (group1.size() < group2.size()) {
+ smallerSet = group1;
+ largerSet = group2;
+ } else {
+ smallerSet = group2;
+ largerSet = group1;
+ }
+ for (V v : smallerSet) {
+ vertexToGroup.put(v, largerSet);
+ }
+ largerSet.addAll(smallerSet);
+ return largerSet;
+ }
+
+ public static <V> Set<Set<V>> uniqueVertexGroups(final Map<V, Set<V>>
vertexToGroup) {
+ final Set<Set<V>> distinctGroups = Collections.newSetFromMap(new
IdentityHashMap<>());
+ distinctGroups.addAll(vertexToGroup.values());
+ return distinctGroups;
+ }
+
+ private VertexGroupComputeUtil() {}
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/LogicalPipelinedRegionComputeUtil.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/LogicalPipelinedRegionComputeUtil.java
index f47476b..35f6789 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/LogicalPipelinedRegionComputeUtil.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/LogicalPipelinedRegionComputeUtil.java
@@ -27,8 +27,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static
org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil.uniqueVertexGroups;
import static
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.buildRawRegions;
-import static
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.uniqueRegions;
/** Utils for computing {@link LogicalPipelinedRegion}s. */
public final class LogicalPipelinedRegionComputeUtil {
@@ -43,7 +43,7 @@ public final class LogicalPipelinedRegionComputeUtil {
// Since LogicalTopology is a DAG, there is no need to do cycle
detection nor to merge
// regions on cycles.
- return uniqueRegions(vertexToRegion);
+ return uniqueVertexGroups(vertexToRegion);
}
private static Iterable<LogicalResult> getNonReconnectableConsumedResults(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
index e111418..65c659c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
@@ -19,10 +19,10 @@
package org.apache.flink.runtime.executiongraph.failover.flip1;
+import org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil;
import org.apache.flink.runtime.topology.Result;
import org.apache.flink.runtime.topology.Vertex;
-import java.util.Collections;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Map;
@@ -67,7 +67,9 @@ public final class PipelinedRegionComputeUtil {
// this check can significantly reduce compute complexity in
All-to-All
// PIPELINED edge case
if (currentRegion != producerRegion) {
- currentRegion = mergeRegions(currentRegion,
producerRegion, vertexToRegion);
+ currentRegion =
+ VertexGroupComputeUtil.mergeVertexGroups(
+ currentRegion, producerRegion,
vertexToRegion);
}
}
}
@@ -75,32 +77,5 @@ public final class PipelinedRegionComputeUtil {
return vertexToRegion;
}
- static <V extends Vertex<?, ?, V, ?>> Set<V> mergeRegions(
- final Set<V> region1, final Set<V> region2, final Map<V, Set<V>>
vertexToRegion) {
-
- // merge the smaller region into the larger one to reduce the cost
- final Set<V> smallerSet;
- final Set<V> largerSet;
- if (region1.size() < region2.size()) {
- smallerSet = region1;
- largerSet = region2;
- } else {
- smallerSet = region2;
- largerSet = region1;
- }
- for (V v : smallerSet) {
- vertexToRegion.put(v, largerSet);
- }
- largerSet.addAll(smallerSet);
- return largerSet;
- }
-
- static <V extends Vertex<?, ?, V, ?>> Set<Set<V>> uniqueRegions(
- final Map<V, Set<V>> vertexToRegion) {
- final Set<Set<V>> distinctRegions = Collections.newSetFromMap(new
IdentityHashMap<>());
- distinctRegions.addAll(vertexToRegion.values());
- return distinctRegions;
- }
-
private PipelinedRegionComputeUtil() {}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/SchedulingPipelinedRegionComputeUtil.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/SchedulingPipelinedRegionComputeUtil.java
index a44d8cf..ffe781f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/SchedulingPipelinedRegionComputeUtil.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/SchedulingPipelinedRegionComputeUtil.java
@@ -36,9 +36,9 @@ import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
+import static
org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil.mergeVertexGroups;
+import static
org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil.uniqueVertexGroups;
import static
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.buildRawRegions;
-import static
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.mergeRegions;
-import static
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.uniqueRegions;
import static org.apache.flink.util.Preconditions.checkState;
/** Utils for computing {@link SchedulingPipelinedRegion}s. */
@@ -73,7 +73,7 @@ public final class SchedulingPipelinedRegionComputeUtil {
executionVertexRetriever) {
final List<Set<SchedulingExecutionVertex>> regionList =
- new ArrayList<>(uniqueRegions(vertexToRegion));
+ new ArrayList<>(uniqueVertexGroups(vertexToRegion));
final List<List<Integer>> outEdges =
buildOutEdgesDesc(vertexToRegion, regionList,
executionVertexRetriever);
final Set<Set<Integer>> sccs =
@@ -88,7 +88,8 @@ public final class SchedulingPipelinedRegionComputeUtil {
Set<SchedulingExecutionVertex> mergedRegion = new HashSet<>();
for (int regionIndex : scc) {
mergedRegion =
- mergeRegions(mergedRegion,
regionList.get(regionIndex), vertexToRegion);
+ mergeVertexGroups(
+ mergedRegion, regionList.get(regionIndex),
vertexToRegion);
}
mergedRegions.add(mergedRegion);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
index 8392c32..9772ff4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
@@ -57,6 +57,8 @@ public class JobEdge implements java.io.Serializable {
private boolean isBroadcast;
+ private boolean isForward;
+
/**
* Optional name for the pre-processing operation (sort, combining sort,
...), to be displayed
* in the JSON plan.
@@ -176,6 +178,16 @@ public class JobEdge implements java.io.Serializable {
isBroadcast = broadcast;
}
+ /** Gets whether the edge is forward edge. */
+ public boolean isForward() {
+ return isForward;
+ }
+
+ /** Sets whether the edge is forward edge. */
+ public void setForward(boolean forward) {
+ isForward = forward;
+ }
+
/**
* Gets the channel state rescaler used for rescaling persisted data on
downstream side of this
* JobEdge.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/topology/DefaultLogicalTopology.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/topology/DefaultLogicalTopology.java
index 49024fe..4ba0bf6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/topology/DefaultLogicalTopology.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/topology/DefaultLogicalTopology.java
@@ -91,7 +91,7 @@ public class DefaultLogicalTopology implements
LogicalTopology {
return verticesSorted;
}
- private DefaultLogicalVertex getVertex(final JobVertexID vertexId) {
+ public DefaultLogicalVertex getVertex(final JobVertexID vertexId) {
return Optional.ofNullable(idToVertexMap.get(vertexId))
.orElseThrow(
() -> new IllegalArgumentException("can not find
vertex: " + vertexId));
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
index 03300ed..58795fc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
@@ -47,6 +47,7 @@ import
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerFactory;
+import
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.util.clock.SystemClock;
@@ -178,6 +179,16 @@ public final class DefaultSlotPoolServiceSchedulerFactory
new DeclarativeSlotPoolServiceFactory(
SystemClock.getInstance(), slotIdleTimeout,
rpcTimeout);
break;
+ case AdaptiveBatch:
+ schedulerNGFactory = new AdaptiveBatchSchedulerFactory();
+ slotPoolServiceFactory =
+ new DeclarativeSlotPoolBridgeServiceFactory(
+ SystemClock.getInstance(),
+ rpcTimeout,
+ slotIdleTimeout,
+ batchSlotTimeout,
+ getRequestSlotMatchingStrategy(configuration,
jobType));
+ break;
default:
throw new IllegalArgumentException(
String.format(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java
index 89034c7..d7454c8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java
@@ -24,7 +24,8 @@ import
org.apache.flink.runtime.executiongraph.ExecutionVertex;
import java.util.concurrent.CompletableFuture;
-class DefaultExecutionVertexOperations implements ExecutionVertexOperations {
+/** Default implementation of {@link ExecutionVertexOperations}. */
+public class DefaultExecutionVertexOperations implements
ExecutionVertexOperations {
@Override
public void deploy(final ExecutionVertex executionVertex) throws
JobException {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index fba1bfa..1d0f3b2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -85,7 +85,7 @@ import static org.apache.flink.util.Preconditions.checkState;
/** The future default scheduler. */
public class DefaultScheduler extends SchedulerBase implements
SchedulerOperations {
- private final Logger log;
+ protected final Logger log;
private final ClassLoader userCodeLoader;
@@ -95,7 +95,7 @@ public class DefaultScheduler extends SchedulerBase
implements SchedulerOperatio
private final ScheduledExecutor delayExecutor;
- private final SchedulingStrategy schedulingStrategy;
+ protected final SchedulingStrategy schedulingStrategy;
private final ExecutionVertexOperations executionVertexOperations;
@@ -136,6 +136,57 @@ public class DefaultScheduler extends SchedulerBase
implements SchedulerOperatio
final ShuffleMaster<?> shuffleMaster,
final Time rpcTimeout)
throws Exception {
+ this(
+ log,
+ jobGraph,
+ ioExecutor,
+ jobMasterConfiguration,
+ startUpAction,
+ delayExecutor,
+ userCodeLoader,
+ checkpointsCleaner,
+ checkpointRecoveryFactory,
+ jobManagerJobMetricGroup,
+ schedulingStrategyFactory,
+ failoverStrategyFactory,
+ restartBackoffTimeStrategy,
+ executionVertexOperations,
+ executionVertexVersioner,
+ executionSlotAllocatorFactory,
+ initializationTimestamp,
+ mainThreadExecutor,
+ jobStatusListener,
+ executionGraphFactory,
+ shuffleMaster,
+ rpcTimeout,
+ computeVertexParallelismStore(jobGraph));
+ }
+
+ protected DefaultScheduler(
+ final Logger log,
+ final JobGraph jobGraph,
+ final Executor ioExecutor,
+ final Configuration jobMasterConfiguration,
+ final Consumer<ComponentMainThreadExecutor> startUpAction,
+ final ScheduledExecutor delayExecutor,
+ final ClassLoader userCodeLoader,
+ final CheckpointsCleaner checkpointsCleaner,
+ final CheckpointRecoveryFactory checkpointRecoveryFactory,
+ final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+ final SchedulingStrategyFactory schedulingStrategyFactory,
+ final FailoverStrategy.Factory failoverStrategyFactory,
+ final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+ final ExecutionVertexOperations executionVertexOperations,
+ final ExecutionVertexVersioner executionVertexVersioner,
+ final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
+ long initializationTimestamp,
+ final ComponentMainThreadExecutor mainThreadExecutor,
+ final JobStatusListener jobStatusListener,
+ final ExecutionGraphFactory executionGraphFactory,
+ final ShuffleMaster<?> shuffleMaster,
+ final Time rpcTimeout,
+ final VertexParallelismStore vertexParallelismStore)
+ throws Exception {
super(
log,
@@ -151,7 +202,7 @@ public class DefaultScheduler extends SchedulerBase
implements SchedulerOperatio
mainThreadExecutor,
jobStatusListener,
executionGraphFactory,
- computeVertexParallelismStore(jobGraph));
+ vertexParallelismStore);
this.log = log;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
index 0898c84..db72afa 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
@@ -19,28 +19,21 @@
package org.apache.flink.runtime.scheduler;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.jobgraph.JobType;
-import
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
import
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
import
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
-import
org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.util.SlotSelectionStrategyUtils;
import org.apache.flink.util.clock.SystemClock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.function.Consumer;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -50,7 +43,6 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
* PipelinedRegionSchedulingStrategy}.
*/
public class DefaultSchedulerComponents {
- private static final Logger LOG =
LoggerFactory.getLogger(DefaultSchedulerComponents.class);
private final SchedulingStrategyFactory schedulingStrategyFactory;
private final Consumer<ComponentMainThreadExecutor> startUpAction;
@@ -99,7 +91,8 @@ public class DefaultSchedulerComponents {
final Time slotRequestTimeout) {
final SlotSelectionStrategy slotSelectionStrategy =
- selectSlotSelectionStrategy(jobType, jobMasterConfiguration);
+ SlotSelectionStrategyUtils.selectSlotSelectionStrategy(
+ jobType, jobMasterConfiguration);
final PhysicalSlotRequestBulkChecker bulkChecker =
PhysicalSlotRequestBulkCheckerImpl.createFromSlotPool(
slotPool, SystemClock.getInstance());
@@ -116,34 +109,4 @@ public class DefaultSchedulerComponents {
bulkChecker::start,
allocatorFactory);
}
-
- @VisibleForTesting
- static SlotSelectionStrategy selectSlotSelectionStrategy(
- final JobType jobType, final Configuration configuration) {
- final boolean evenlySpreadOutSlots =
-
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
-
- final SlotSelectionStrategy locationPreferenceSlotSelectionStrategy;
-
- locationPreferenceSlotSelectionStrategy =
- evenlySpreadOutSlots
- ?
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut()
- :
LocationPreferenceSlotSelectionStrategy.createDefault();
-
- final boolean isLocalRecoveryEnabled =
- configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY);
- if (isLocalRecoveryEnabled) {
- if (jobType == JobType.STREAMING) {
- return PreviousAllocationSlotSelectionStrategy.create(
- locationPreferenceSlotSelectionStrategy);
- } else {
- LOG.warn(
- "Batch job does not support local recovery. Falling
back to use "
- +
locationPreferenceSlotSelectionStrategy.getClass());
- return locationPreferenceSlotSelectionStrategy;
- }
- } else {
- return locationPreferenceSlotSelectionStrategy;
- }
- }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
index 1dedf15..11cd1c9 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
@@ -25,7 +25,7 @@ import
org.apache.flink.runtime.executiongraph.ExecutionVertex;
import java.util.concurrent.CompletableFuture;
/** Operations on the {@link ExecutionVertex}. */
-interface ExecutionVertexOperations {
+public interface ExecutionVertexOperations {
void deploy(ExecutionVertex executionVertex) throws JobException;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 70d4ab2..561e07a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -152,7 +152,7 @@ public abstract class SchedulerBase implements SchedulerNG,
CheckpointScheduling
private final ExecutionGraphHandler executionGraphHandler;
- private final OperatorCoordinatorHandler operatorCoordinatorHandler;
+ protected final OperatorCoordinatorHandler operatorCoordinatorHandler;
private final ComponentMainThreadExecutor mainThreadExecutor;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
index 2fc3bb3..5edcb3b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
@@ -24,7 +24,7 @@ import
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecke
import
org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory;
/** Factory for {@link SlotSharingExecutionSlotAllocator}. */
-class SlotSharingExecutionSlotAllocatorFactory implements
ExecutionSlotAllocatorFactory {
+public class SlotSharingExecutionSlotAllocatorFactory implements
ExecutionSlotAllocatorFactory {
private final PhysicalSlotProvider slotProvider;
private final boolean slotWillBeOccupiedIndefinitely;
@@ -35,7 +35,7 @@ class SlotSharingExecutionSlotAllocatorFactory implements
ExecutionSlotAllocator
private final SlotSharingStrategy.Factory slotSharingStrategyFactory;
- SlotSharingExecutionSlotAllocatorFactory(
+ public SlotSharingExecutionSlotAllocatorFactory(
PhysicalSlotProvider slotProvider,
boolean slotWillBeOccupiedIndefinitely,
PhysicalSlotRequestBulkChecker bulkChecker,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
new file mode 100644
index 0000000..1403fb4
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalVertex;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexOperations;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
+import
org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroup;
+import
org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroupComputeUtil;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This scheduler decides the parallelism of JobVertex according to the data
volume it consumes. A
+ * dynamically built up ExecutionGraph is used for this purpose.
+ */
+public class AdaptiveBatchScheduler extends DefaultScheduler implements
SchedulerOperations {
+
+ private final DefaultLogicalTopology logicalTopology;
+
+ private final VertexParallelismDecider vertexParallelismDecider;
+
+ private final Map<JobVertexID, ForwardGroup> forwardGroupsByJobVertexId;
+
+ AdaptiveBatchScheduler(
+ final Logger log,
+ final JobGraph jobGraph,
+ final Executor ioExecutor,
+ final Configuration jobMasterConfiguration,
+ final Consumer<ComponentMainThreadExecutor> startUpAction,
+ final ScheduledExecutor delayExecutor,
+ final ClassLoader userCodeLoader,
+ final CheckpointsCleaner checkpointsCleaner,
+ final CheckpointRecoveryFactory checkpointRecoveryFactory,
+ final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+ final SchedulingStrategyFactory schedulingStrategyFactory,
+ final FailoverStrategy.Factory failoverStrategyFactory,
+ final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+ final ExecutionVertexOperations executionVertexOperations,
+ final ExecutionVertexVersioner executionVertexVersioner,
+ final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
+ long initializationTimestamp,
+ final ComponentMainThreadExecutor mainThreadExecutor,
+ final JobStatusListener jobStatusListener,
+ final ExecutionGraphFactory executionGraphFactory,
+ final ShuffleMaster<?> shuffleMaster,
+ final Time rpcTimeout,
+ final VertexParallelismDecider vertexParallelismDecider,
+ int defaultMaxParallelism)
+ throws Exception {
+
+ super(
+ log,
+ jobGraph,
+ ioExecutor,
+ jobMasterConfiguration,
+ startUpAction,
+ delayExecutor,
+ userCodeLoader,
+ checkpointsCleaner,
+ checkpointRecoveryFactory,
+ jobManagerJobMetricGroup,
+ schedulingStrategyFactory,
+ failoverStrategyFactory,
+ restartBackoffTimeStrategy,
+ executionVertexOperations,
+ executionVertexVersioner,
+ executionSlotAllocatorFactory,
+ initializationTimestamp,
+ mainThreadExecutor,
+ jobStatusListener,
+ executionGraphFactory,
+ shuffleMaster,
+ rpcTimeout,
+ computeVertexParallelismStoreForDynamicGraph(
+ jobGraph.getVertices(), defaultMaxParallelism));
+
+ this.logicalTopology = DefaultLogicalTopology.fromJobGraph(jobGraph);
+
+ this.vertexParallelismDecider = vertexParallelismDecider;
+
+ this.forwardGroupsByJobVertexId =
+ ForwardGroupComputeUtil.computeForwardGroups(
+ jobGraph.getVerticesSortedTopologicallyFromSources(),
+ getExecutionGraph()::getJobVertex);
+ }
+
+ @Override
+ public void startSchedulingInternal() {
+ initializeVerticesIfPossible();
+
+ super.startSchedulingInternal();
+ }
+
+ @Override
+ protected void updateTaskExecutionStateInternal(
+ final ExecutionVertexID executionVertexId,
+ final TaskExecutionStateTransition taskExecutionState) {
+
+ initializeVerticesIfPossible();
+
+ super.updateTaskExecutionStateInternal(executionVertexId,
taskExecutionState);
+ }
+
+ private void initializeVerticesIfPossible() {
+ final List<ExecutionJobVertex> newlyInitializedJobVertices = new
ArrayList<>();
+ try {
+ final long createTimestamp = System.currentTimeMillis();
+ for (ExecutionJobVertex jobVertex :
getExecutionGraph().getVerticesTopologically()) {
+ maybeSetParallelism(jobVertex);
+ }
+ for (ExecutionJobVertex jobVertex :
getExecutionGraph().getVerticesTopologically()) {
+ if (canInitialize(jobVertex)) {
+ getExecutionGraph().initializeJobVertex(jobVertex,
createTimestamp);
+ newlyInitializedJobVertices.add(jobVertex);
+ }
+ }
+ } catch (JobException ex) {
+ log.error("Unexpected error occurred when initializing
ExecutionJobVertex", ex);
+ failJob(ex, System.currentTimeMillis());
+ }
+
+ if (newlyInitializedJobVertices.size() > 0) {
+ updateTopology(newlyInitializedJobVertices);
+ }
+ }
+
+ private void maybeSetParallelism(final ExecutionJobVertex jobVertex) {
+ if (jobVertex.isParallelismDecided()) {
+ return;
+ }
+
+ Optional<List<BlockingResultInfo>> consumedResultsInfo =
+ tryGetConsumedResultsInfo(jobVertex);
+ if (!consumedResultsInfo.isPresent()) {
+ return;
+ }
+
+ ForwardGroup forwardGroup =
forwardGroupsByJobVertexId.get(jobVertex.getJobVertexId());
+ int parallelism;
+
+ if (forwardGroup != null && forwardGroup.isParallelismDecided()) {
+ parallelism = forwardGroup.getParallelism();
+ log.info(
+ "Parallelism of JobVertex: {} ({}) is decided to be {}
according to forward group's parallelism.",
+ jobVertex.getName(),
+ jobVertex.getJobVertexId(),
+ parallelism);
+
+ } else {
+ parallelism =
+
vertexParallelismDecider.decideParallelismForVertex(consumedResultsInfo.get());
+ if (forwardGroup != null) {
+ forwardGroup.setParallelism(parallelism);
+ }
+
+ log.info(
+ "Parallelism of JobVertex: {} ({}) is decided to be {}.",
+ jobVertex.getName(),
+ jobVertex.getJobVertexId(),
+ parallelism);
+ }
+
+ changeJobVertexParallelism(jobVertex, parallelism);
+ }
+
+ private void changeJobVertexParallelism(ExecutionJobVertex jobVertex, int
parallelism) {
+ // update the JSON Plan, it's needed to enable REST APIs to return the
latest parallelism of
+ // job
+ // vertices
+ jobVertex.getJobVertex().setParallelism(parallelism);
+ try {
+
getExecutionGraph().setJsonPlan(JsonPlanGenerator.generatePlan(getJobGraph()));
+ } catch (Throwable t) {
+ log.warn("Cannot create JSON plan for job", t);
+ // give the graph an empty plan
+ getExecutionGraph().setJsonPlan("{}");
+ }
+
+ jobVertex.setParallelism(parallelism);
+ }
+
+ /** Get information of consumable results. */
+ private Optional<List<BlockingResultInfo>> tryGetConsumedResultsInfo(
+ final ExecutionJobVertex jobVertex) {
+
+ List<BlockingResultInfo> consumableResultInfo = new ArrayList<>();
+
+ DefaultLogicalVertex logicalVertex =
logicalTopology.getVertex(jobVertex.getJobVertexId());
+ Iterable<DefaultLogicalResult> consumedResults =
logicalVertex.getConsumedResults();
+
+ for (DefaultLogicalResult consumedResult : consumedResults) {
+ final ExecutionJobVertex producerVertex =
+
getExecutionJobVertex(consumedResult.getProducer().getId());
+ if (producerVertex.isFinished()) {
+ IntermediateResult intermediateResult =
+
getExecutionGraph().getAllIntermediateResults().get(consumedResult.getId());
+ checkNotNull(intermediateResult);
+
+ consumableResultInfo.add(
+
BlockingResultInfo.createFromIntermediateResult(intermediateResult));
+ } else {
+ // not all inputs consumable, return Optional.empty()
+ return Optional.empty();
+ }
+ }
+
+ return Optional.of(consumableResultInfo);
+ }
+
+ private boolean canInitialize(final ExecutionJobVertex jobVertex) {
+ if (jobVertex.isInitialized() || !jobVertex.isParallelismDecided()) {
+ return false;
+ }
+
+ // all the upstream job vertices need to have been initialized
+ for (JobEdge inputEdge : jobVertex.getJobVertex().getInputs()) {
+ final ExecutionJobVertex producerVertex =
+
getExecutionGraph().getJobVertex(inputEdge.getSource().getProducer().getID());
+ checkNotNull(producerVertex);
+ if (!producerVertex.isInitialized()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private void updateTopology(final List<ExecutionJobVertex>
newlyInitializedJobVertices) {
+ for (ExecutionJobVertex vertex : newlyInitializedJobVertices) {
+ initializeOperatorCoordinatorsFor(vertex);
+ }
+
+ // notify execution graph updated, and try to update the execution
topology.
+
getExecutionGraph().notifyNewlyInitializedJobVertices(newlyInitializedJobVertices);
+ }
+
+ private void initializeOperatorCoordinatorsFor(ExecutionJobVertex vertex) {
+ operatorCoordinatorHandler.registerAndStartNewCoordinators(
+ vertex.getOperatorCoordinators(), getMainThreadExecutor());
+ }
+
+ /**
+ * Compute the {@link VertexParallelismStore} for all given vertices in a
dynamic graph, which
+ * will set defaults and ensure that the returned store contains valid
parallelisms, with the
+ * configured default max parallelism.
+ *
+ * @param vertices the vertices to compute parallelism for
+ * @param defaultMaxParallelism the global default max parallelism
+ * @return the computed parallelism store
+ */
+ @VisibleForTesting
+ public static VertexParallelismStore
computeVertexParallelismStoreForDynamicGraph(
+ Iterable<JobVertex> vertices, int defaultMaxParallelism) {
+ // for dynamic graph, there is no need to normalize vertex
parallelism. if the max
+ // parallelism is not configured and the parallelism is a positive
value, max
+ // parallelism can be computed against the parallelism, otherwise it
needs to use the
+ // global default max parallelism.
+ return computeVertexParallelismStore(
+ vertices,
+ v -> {
+ if (v.getParallelism() > 0) {
+ return getDefaultMaxParallelism(v);
+ } else {
+ return defaultMaxParallelism;
+ }
+ },
+ Function.identity());
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
new file mode 100644
index 0000000..541c56e
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.BatchShuffleMode;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
+import
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
+import
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+import
org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocatorFactory;
+import
org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.util.SlotSelectionStrategyUtils;
+import org.apache.flink.util.clock.SystemClock;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Factory for {@link AdaptiveBatchScheduler}. */
+public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory {
+
+ @Override
+ public SchedulerNG createInstance(
+ Logger log,
+ JobGraph jobGraph,
+ Executor ioExecutor,
+ Configuration jobMasterConfiguration,
+ SlotPoolService slotPoolService,
+ ScheduledExecutorService futureExecutor,
+ ClassLoader userCodeLoader,
+ CheckpointRecoveryFactory checkpointRecoveryFactory,
+ Time rpcTimeout,
+ BlobWriter blobWriter,
+ JobManagerJobMetricGroup jobManagerJobMetricGroup,
+ Time slotRequestTimeout,
+ ShuffleMaster<?> shuffleMaster,
+ JobMasterPartitionTracker partitionTracker,
+ ExecutionDeploymentTracker executionDeploymentTracker,
+ long initializationTimestamp,
+ ComponentMainThreadExecutor mainThreadExecutor,
+ FatalErrorHandler fatalErrorHandler,
+ JobStatusListener jobStatusListener)
+ throws Exception {
+
+ checkState(
+ jobGraph.getJobType() == JobType.BATCH,
+ "Adaptive batch scheduler only supports batch jobs");
+ checkAllExchangesBlocking(jobGraph);
+
+ final SlotPool slotPool =
+ slotPoolService
+ .castInto(SlotPool.class)
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "The DefaultScheduler requires
a SlotPool."));
+
+ final SlotSelectionStrategy slotSelectionStrategy =
+ SlotSelectionStrategyUtils.selectSlotSelectionStrategy(
+ JobType.BATCH, jobMasterConfiguration);
+ final PhysicalSlotRequestBulkChecker bulkChecker =
+ PhysicalSlotRequestBulkCheckerImpl.createFromSlotPool(
+ slotPool, SystemClock.getInstance());
+ final PhysicalSlotProvider physicalSlotProvider =
+ new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
+ final ExecutionSlotAllocatorFactory allocatorFactory =
+ new SlotSharingExecutionSlotAllocatorFactory(
+ physicalSlotProvider, false, bulkChecker,
slotRequestTimeout);
+
+ final RestartBackoffTimeStrategy restartBackoffTimeStrategy =
+
RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(
+ jobGraph.getSerializedExecutionConfig()
+ .deserializeValue(userCodeLoader)
+ .getRestartStrategy(),
+ jobMasterConfiguration,
+ jobGraph.isCheckpointingEnabled())
+ .create();
+ log.info(
+ "Using restart back off time strategy {} for {} ({}).",
+ restartBackoffTimeStrategy,
+ jobGraph.getName(),
+ jobGraph.getJobID());
+
+ final ExecutionGraphFactory executionGraphFactory =
+ new DefaultExecutionGraphFactory(
+ jobMasterConfiguration,
+ userCodeLoader,
+ executionDeploymentTracker,
+ futureExecutor,
+ ioExecutor,
+ rpcTimeout,
+ jobManagerJobMetricGroup,
+ blobWriter,
+ shuffleMaster,
+ partitionTracker,
+ true);
+
+ return new AdaptiveBatchScheduler(
+ log,
+ jobGraph,
+ ioExecutor,
+ jobMasterConfiguration,
+ bulkChecker::start,
+ new ScheduledExecutorServiceAdapter(futureExecutor),
+ userCodeLoader,
+ new CheckpointsCleaner(),
+ checkpointRecoveryFactory,
+ jobManagerJobMetricGroup,
+ new VertexwiseSchedulingStrategy.Factory(),
+
FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
+ restartBackoffTimeStrategy,
+ new DefaultExecutionVertexOperations(),
+ new ExecutionVertexVersioner(),
+ allocatorFactory,
+ initializationTimestamp,
+ mainThreadExecutor,
+ jobStatusListener,
+ executionGraphFactory,
+ shuffleMaster,
+ rpcTimeout,
+ DefaultVertexParallelismDecider.from(jobMasterConfiguration),
+ jobMasterConfiguration.getInteger(
+
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM));
+ }
+
+ private void checkAllExchangesBlocking(final JobGraph jobGraph) {
+ for (JobVertex jobVertex : jobGraph.getVertices()) {
+ for (IntermediateDataSet dataSet :
jobVertex.getProducedDataSets()) {
+ checkState(
+ dataSet.getResultType().isBlocking(),
+ String.format(
+ "At the moment, adaptive batch scheduler
requires batch workloads "
+ + "to be executed with types of all
edges being BLOCKING. "
+ + "To do that, you need to configure
'%s' to '%s'.",
+ ExecutionOptions.BATCH_SHUFFLE_MODE.key(),
+ BatchShuffleMode.ALL_EXCHANGES_BLOCKING));
+ }
+ }
+ }
+
+ @Override
+ public JobManagerOptions.SchedulerType getSchedulerType() {
+ return JobManagerOptions.SchedulerType.AdaptiveBatch;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
index 1248fd1..11a45a6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
@@ -18,8 +18,18 @@
package org.apache.flink.runtime.scheduler.adaptivebatch;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+
+import java.util.ArrayList;
import java.util.List;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
/** The blocking result info, which will be used to calculate the vertex
parallelism. */
public class BlockingResultInfo {
@@ -40,12 +50,32 @@ public class BlockingResultInfo {
return isBroadcast;
}
- public static BlockingResultInfo createFromBroadcastResult(List<Long>
blockingPartitionSizes) {
+ @VisibleForTesting
+ static BlockingResultInfo createFromBroadcastResult(List<Long>
blockingPartitionSizes) {
return new BlockingResultInfo(blockingPartitionSizes, true);
}
- public static BlockingResultInfo createFromNonBroadcastResult(
- List<Long> blockingPartitionSizes) {
+ @VisibleForTesting
+ static BlockingResultInfo createFromNonBroadcastResult(List<Long>
blockingPartitionSizes) {
return new BlockingResultInfo(blockingPartitionSizes, false);
}
+
+ public static BlockingResultInfo createFromIntermediateResult(
+ IntermediateResult intermediateResult) {
+ checkArgument(intermediateResult != null);
+
+ List<Long> blockingPartitionSizes = new ArrayList<>();
+ for (IntermediateResultPartition partition :
intermediateResult.getPartitions()) {
+ checkState(partition.isConsumable());
+
+ IOMetrics ioMetrics =
+
partition.getProducer().getCurrentExecutionAttempt().getIOMetrics();
+ checkNotNull(ioMetrics, "IOMetrics should not be null.");
+
+ blockingPartitionSizes.add(
+
ioMetrics.getNumBytesProducedOfPartitions().get(partition.getPartitionId()));
+ }
+
+ return new BlockingResultInfo(blockingPartitionSizes,
intermediateResult.isBroadcast());
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroup.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroup.java
new file mode 100644
index 0000000..fa5dffc
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroup.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A forward group is a set of job vertices connected via forward edges.
Parallelisms of all job
+ * vertices in the same {@link ForwardGroup} must be the same.
+ */
+public class ForwardGroup {
+
+ private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+
+ private final Set<JobVertexID> jobVertexIds = new HashSet<>();
+
+ public ForwardGroup(final Set<ExecutionJobVertex> jobVertices) {
+ checkNotNull(jobVertices);
+
+ Set<Integer> decidedParallelisms =
+ jobVertices.stream()
+ .filter(
+ jobVertex -> {
+
jobVertexIds.add(jobVertex.getJobVertexId());
+ return jobVertex.isParallelismDecided();
+ })
+ .map(ExecutionJobVertex::getParallelism)
+ .collect(Collectors.toSet());
+
+ checkState(decidedParallelisms.size() <= 1);
+ if (decidedParallelisms.size() == 1) {
+ this.parallelism = decidedParallelisms.iterator().next();
+ }
+ }
+
+ public void setParallelism(int parallelism) {
+ checkState(this.parallelism == ExecutionConfig.PARALLELISM_DEFAULT);
+ this.parallelism = parallelism;
+ }
+
+ public boolean isParallelismDecided() {
+ return parallelism > 0;
+ }
+
+ public int getParallelism() {
+ checkState(isParallelismDecided());
+ return parallelism;
+ }
+
+ public int size() {
+ return jobVertexIds.size();
+ }
+
+ public Set<JobVertexID> getJobVertexIds() {
+ return jobVertexIds;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtil.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtil.java
new file mode 100644
index 0000000..28491cf
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtil.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup;
+
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Common utils for computing forward groups. */
+public class ForwardGroupComputeUtil {
+
+ public static Map<JobVertexID, ForwardGroup> computeForwardGroups(
+ final Iterable<JobVertex> topologicallySortedVertices,
+ Function<JobVertexID, ExecutionJobVertex>
executionJobVertexRetriever) {
+
+ final Map<JobVertex, Set<JobVertex>> vertexToGroup = new
IdentityHashMap<>();
+
+ // iterate all the vertices which are topologically sorted
+ for (JobVertex vertex : topologicallySortedVertices) {
+ Set<JobVertex> currentGroup = new HashSet<>();
+ currentGroup.add(vertex);
+ vertexToGroup.put(vertex, currentGroup);
+
+ for (JobEdge input : getForwardInputs(vertex)) {
+ final JobVertex producerVertex =
input.getSource().getProducer();
+ final Set<JobVertex> producerGroup =
vertexToGroup.get(producerVertex);
+
+ if (producerGroup == null) {
+ throw new IllegalStateException(
+ "Producer task "
+ + producerVertex.getID()
+ + " forward group is null"
+ + " while calculating forward group for
the consumer task "
+ + vertex.getID()
+ + ". This should be a forward group
building bug.");
+ }
+
+ if (currentGroup != producerGroup) {
+ currentGroup =
+ VertexGroupComputeUtil.mergeVertexGroups(
+ currentGroup, producerGroup,
vertexToGroup);
+ }
+ }
+ }
+
+ final Map<JobVertexID, ForwardGroup> ret = new HashMap<>();
+ for (Set<JobVertex> vertexGroup :
+ VertexGroupComputeUtil.uniqueVertexGroups(vertexToGroup)) {
+ if (vertexGroup.size() > 1) {
+ ForwardGroup forwardGroup =
+ new ForwardGroup(
+ vertexGroup.stream()
+ .map(
+ vertex ->
+
executionJobVertexRetriever.apply(
+
vertex.getID()))
+ .collect(Collectors.toSet()));
+ for (JobVertexID jobVertexId : forwardGroup.getJobVertexIds())
{
+ ret.put(jobVertexId, forwardGroup);
+ }
+ }
+ }
+ return ret;
+ }
+
+ static Iterable<JobEdge> getForwardInputs(JobVertex jobVertex) {
+ return jobVertex.getInputs().stream()
+ .filter(JobEdge::isForward)
+ .collect(Collectors.toSet());
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtils.java
new file mode 100644
index 0000000..3eb7a9f
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobType;
+import
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
+import
org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility class for selecting {@link SlotSelectionStrategy}. */
+public class SlotSelectionStrategyUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SlotSelectionStrategyUtils.class);
+
+ public static SlotSelectionStrategy selectSlotSelectionStrategy(
+ final JobType jobType, final Configuration configuration) {
+ final boolean evenlySpreadOutSlots =
+
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
+
+ final SlotSelectionStrategy locationPreferenceSlotSelectionStrategy;
+
+ locationPreferenceSlotSelectionStrategy =
+ evenlySpreadOutSlots
+ ?
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut()
+ :
LocationPreferenceSlotSelectionStrategy.createDefault();
+
+ final boolean isLocalRecoveryEnabled =
+ configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY);
+ if (isLocalRecoveryEnabled) {
+ if (jobType == JobType.STREAMING) {
+ return PreviousAllocationSlotSelectionStrategy.create(
+ locationPreferenceSlotSelectionStrategy);
+ } else {
+ LOG.warn(
+ "Batch job does not support local recovery. Falling
back to use "
+ +
locationPreferenceSlotSelectionStrategy.getClass());
+ return locationPreferenceSlotSelectionStrategy;
+ }
+ } else {
+ return locationPreferenceSlotSelectionStrategy;
+ }
+ }
+
+ /** Private default constructor to avoid being instantiated. */
+ private SlotSelectionStrategyUtils() {}
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
index 240427b..3ec1bb0 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
@@ -24,15 +24,14 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
+import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
-import java.util.function.Function;
import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
import static org.hamcrest.CoreMatchers.is;
@@ -210,38 +209,11 @@ public class ExecutionJobVertexTest {
final DefaultExecutionGraph eg =
TestingDefaultExecutionGraphBuilder.newBuilder().build();
final VertexParallelismStore vertexParallelismStore =
- computeVertexParallelismStoreForDynamicGraph(
+
AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(
Collections.singletonList(jobVertex),
defaultMaxParallelism);
final VertexParallelismInformation vertexParallelismInfo =
vertexParallelismStore.getParallelismInfo(jobVertex.getID());
return new ExecutionJobVertex(eg, jobVertex, vertexParallelismInfo);
}
-
- /**
- * Compute the {@link VertexParallelismStore} for all given vertices in a
dynamic graph, which
- * will set defaults and ensure that the returned store contains valid
parallelisms, with the
- * configured default max parallelism.
- *
- * @param vertices the vertices to compute parallelism for
- * @param defaultMaxParallelism the global default max parallelism
- * @return the computed parallelism store
- */
- public static VertexParallelismStore
computeVertexParallelismStoreForDynamicGraph(
- Iterable<JobVertex> vertices, int defaultMaxParallelism) {
- // for dynamic graph, there is no need to normalize vertex
parallelism. if the max
- // parallelism is not configured and the parallelism is a positive
value, max
- // parallelism can be computed against the parallelism, otherwise it
needs to use the
- // global default max parallelism.
- return SchedulerBase.computeVertexParallelismStore(
- vertices,
- v -> {
- if (v.getParallelism() > 0) {
- return SchedulerBase.getDefaultMaxParallelism(v);
- } else {
- return defaultMaxParallelism;
- }
- },
- Function.identity());
- }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
index 611e18a..56be6d2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
+import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
@@ -270,7 +271,7 @@ public class IntermediateResultPartitionTest extends
TestLogger {
public static VertexParallelismStore
computeVertexParallelismStoreConsideringDynamicGraph(
Iterable<JobVertex> vertices, boolean isDynamicGraph, int
defaultMaxParallelism) {
if (isDynamicGraph) {
- return
ExecutionJobVertexTest.computeVertexParallelismStoreForDynamicGraph(
+ return
AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(
vertices, defaultMaxParallelism);
} else {
return SchedulerBase.computeVertexParallelismStore(vertices);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
index 85e225b..f459c54 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
@@ -20,14 +20,10 @@
package org.apache.flink.runtime.scheduler;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobType;
-import
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
-import
org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.util.TestLogger;
@@ -81,32 +77,6 @@ public class DefaultSchedulerComponentsFactoryTest extends
TestLogger {
}
}
- @Test
- public void
testCreatePreviousAllocationSlotSelectionStrategyForLocalRecoveryStreamingJob()
{
- final Configuration configuration = new Configuration();
- configuration.set(CheckpointingOptions.LOCAL_RECOVERY, true);
-
- final SlotSelectionStrategy slotSelectionStrategy =
- DefaultSchedulerComponents.selectSlotSelectionStrategy(
- JobType.STREAMING, configuration);
-
- assertThat(
- slotSelectionStrategy,
instanceOf(PreviousAllocationSlotSelectionStrategy.class));
- }
-
- @Test
- public void
testCreateLocationPreferenceSlotSelectionStrategyForLocalRecoveryBatchJob() {
- final Configuration configuration = new Configuration();
- configuration.set(CheckpointingOptions.LOCAL_RECOVERY, true);
-
- final SlotSelectionStrategy slotSelectionStrategy =
- DefaultSchedulerComponents.selectSlotSelectionStrategy(
- JobType.BATCH, configuration);
-
- assertThat(
- slotSelectionStrategy,
instanceOf(LocationPreferenceSlotSelectionStrategy.class));
- }
-
private static DefaultSchedulerComponents createSchedulerComponents(
final Configuration configuration) {
return createSchedulerComponents(configuration, false, JobType.BATCH);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index d214d80..aeb8bf0 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -387,39 +387,41 @@ public class SchedulerTestingUtils {
/** Builder for {@link DefaultScheduler}. */
public static class DefaultSchedulerBuilder {
- private final JobGraph jobGraph;
+ protected final JobGraph jobGraph;
- private final ComponentMainThreadExecutor mainThreadExecutor;
+ protected final ComponentMainThreadExecutor mainThreadExecutor;
- private SchedulingStrategyFactory schedulingStrategyFactory =
+ protected SchedulingStrategyFactory schedulingStrategyFactory =
new PipelinedRegionSchedulingStrategy.Factory();
- private Logger log = LOG;
- private Executor ioExecutor = TestingUtils.defaultExecutor();
- private Configuration jobMasterConfiguration = new Configuration();
- private ScheduledExecutorService futureExecutor =
TestingUtils.defaultExecutor();
- private ScheduledExecutor delayExecutor =
+ protected Logger log = LOG;
+ protected Executor ioExecutor = TestingUtils.defaultExecutor();
+ protected Configuration jobMasterConfiguration = new Configuration();
+ protected ScheduledExecutorService futureExecutor =
TestingUtils.defaultExecutor();
+ protected ScheduledExecutor delayExecutor =
new ScheduledExecutorServiceAdapter(futureExecutor);
- private ClassLoader userCodeLoader =
ClassLoader.getSystemClassLoader();
- private CheckpointsCleaner checkpointCleaner = new
CheckpointsCleaner();
- private CheckpointRecoveryFactory checkpointRecoveryFactory =
+ protected ClassLoader userCodeLoader =
ClassLoader.getSystemClassLoader();
+ protected CheckpointsCleaner checkpointCleaner = new
CheckpointsCleaner();
+ protected CheckpointRecoveryFactory checkpointRecoveryFactory =
new StandaloneCheckpointRecoveryFactory();
- private Time rpcTimeout = DEFAULT_TIMEOUT;
- private BlobWriter blobWriter = VoidBlobWriter.getInstance();
- private JobManagerJobMetricGroup jobManagerJobMetricGroup =
+ protected Time rpcTimeout = DEFAULT_TIMEOUT;
+ protected BlobWriter blobWriter = VoidBlobWriter.getInstance();
+ protected JobManagerJobMetricGroup jobManagerJobMetricGroup =
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
- private ShuffleMaster<?> shuffleMaster =
ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER;
- private JobMasterPartitionTracker partitionTracker =
NoOpJobMasterPartitionTracker.INSTANCE;
- private FailoverStrategy.Factory failoverStrategyFactory =
+ protected ShuffleMaster<?> shuffleMaster =
ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER;
+ protected JobMasterPartitionTracker partitionTracker =
+ NoOpJobMasterPartitionTracker.INSTANCE;
+ protected FailoverStrategy.Factory failoverStrategyFactory =
new RestartPipelinedRegionFailoverStrategy.Factory();
- private RestartBackoffTimeStrategy restartBackoffTimeStrategy =
+ protected RestartBackoffTimeStrategy restartBackoffTimeStrategy =
NoRestartBackoffTimeStrategy.INSTANCE;
- private ExecutionVertexOperations executionVertexOperations =
+ protected ExecutionVertexOperations executionVertexOperations =
new DefaultExecutionVertexOperations();
- private ExecutionVertexVersioner executionVertexVersioner = new
ExecutionVertexVersioner();
- private ExecutionSlotAllocatorFactory executionSlotAllocatorFactory =
+ protected ExecutionVertexVersioner executionVertexVersioner =
+ new ExecutionVertexVersioner();
+ protected ExecutionSlotAllocatorFactory executionSlotAllocatorFactory =
new TestExecutionSlotAllocatorFactory();
- private JobStatusListener jobStatusListener = (ignoredA, ignoredB,
ignoredC) -> {};
+ protected JobStatusListener jobStatusListener = (ignoredA, ignoredB,
ignoredC) -> {};
public DefaultSchedulerBuilder(
final JobGraph jobGraph, ComponentMainThreadExecutor
mainThreadExecutor) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
index 9481c82..2c1bf64 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertexTest;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartitionTest;
@@ -35,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
@@ -208,7 +208,7 @@ public class SsgNetworkMemoryCalculationUtilsTest {
JobGraph jobGraph = createBatchGraph(slotSharingGroups,
Arrays.asList(4, -1, -1));
final VertexParallelismStore vertexParallelismStore =
-
ExecutionJobVertexTest.computeVertexParallelismStoreForDynamicGraph(
+
AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(
jobGraph.getVertices(), defaultMaxParallelism);
return TestingDefaultExecutionGraphBuilder.newBuilder()
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
index 26f2933..1eda992 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
@@ -365,4 +365,9 @@ class StateTrackingMockExecutionGraph implements
ExecutionGraph {
throws JobException {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void notifyNewlyInitializedJobVertices(List<ExecutionJobVertex>
vertices) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
new file mode 100644
index 0000000..ab8a8e9
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+/** Test for {@link AdaptiveBatchScheduler}. */
+public class AdaptiveBatchSchedulerTest extends TestLogger {
+
+ private static final int SOURCE_PARALLELISM_1 = 6;
+ private static final int SOURCE_PARALLELISM_2 = 4;
+
+ private static final ComponentMainThreadExecutor mainThreadExecutor =
+ ComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+ @Test
+ public void testAdaptiveBatchScheduler() throws Exception {
+ JobGraph jobGraph = createJobGraph(false);
+ Iterator<JobVertex> jobVertexIterator =
jobGraph.getVertices().iterator();
+ JobVertex source1 = jobVertexIterator.next();
+ JobVertex source2 = jobVertexIterator.next();
+ JobVertex sink = jobVertexIterator.next();
+
+ SchedulerBase scheduler = createScheduler(jobGraph);
+
+ final DefaultExecutionGraph graph = (DefaultExecutionGraph)
scheduler.getExecutionGraph();
+ final ExecutionJobVertex sinkExecutionJobVertex =
graph.getJobVertex(sink.getID());
+
+ scheduler.startScheduling();
+ assertThat(sinkExecutionJobVertex.getParallelism(), is(-1));
+
+ // trigger source1 finished.
+ transitionExecutionsState(scheduler, ExecutionState.FINISHED, source1);
+ assertThat(sinkExecutionJobVertex.getParallelism(), is(-1));
+
+ // trigger source2 finished.
+ transitionExecutionsState(scheduler, ExecutionState.FINISHED, source2);
+ assertThat(sinkExecutionJobVertex.getParallelism(), is(10));
+
+ // check that the jobGraph is updated
+ assertThat(sink.getParallelism(), is(10));
+ }
+
+ @Test
+ public void testDecideParallelismForForwardTarget() throws Exception {
+ JobGraph jobGraph = createJobGraph(true);
+ Iterator<JobVertex> jobVertexIterator =
jobGraph.getVertices().iterator();
+ JobVertex source1 = jobVertexIterator.next();
+ JobVertex source2 = jobVertexIterator.next();
+ JobVertex sink = jobVertexIterator.next();
+
+ SchedulerBase scheduler = createScheduler(jobGraph);
+
+ final DefaultExecutionGraph graph = (DefaultExecutionGraph)
scheduler.getExecutionGraph();
+ final ExecutionJobVertex sinkExecutionJobVertex =
graph.getJobVertex(sink.getID());
+
+ scheduler.startScheduling();
+ assertThat(sinkExecutionJobVertex.getParallelism(), is(-1));
+
+ // trigger source1 finished.
+ transitionExecutionsState(scheduler, ExecutionState.FINISHED, source1);
+ assertThat(sinkExecutionJobVertex.getParallelism(), is(-1));
+
+ // trigger source2 finished.
+ transitionExecutionsState(scheduler, ExecutionState.FINISHED, source2);
+ assertThat(sinkExecutionJobVertex.getParallelism(),
is(SOURCE_PARALLELISM_1));
+
+ // check that the jobGraph is updated
+ assertThat(sink.getParallelism(), is(SOURCE_PARALLELISM_1));
+ }
+
+ /** Transit the state of all executions. */
+ public static void transitionExecutionsState(
+ final SchedulerBase scheduler, final ExecutionState state,
List<Execution> executions) {
+ for (Execution execution : executions) {
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ execution.getAttemptId(),
+ state,
+ null,
+ null,
+ new IOMetrics(0, 0, 0, 0)));
+ }
+ }
+
+ /** Transit the state of all executions in the Job Vertex. */
+ public static void transitionExecutionsState(
+ final SchedulerBase scheduler, final ExecutionState state, final
JobVertex jobVertex) {
+ final ExecutionGraph executionGraph = scheduler.getExecutionGraph();
+ List<Execution> executions =
+
Arrays.asList(executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices())
+ .stream()
+ .map(ExecutionVertex::getCurrentExecutionAttempt)
+ .collect(Collectors.toList());
+ transitionExecutionsState(scheduler, state, executions);
+ }
+
+ public JobVertex createJobVertex(String jobVertexName, int parallelism) {
+ final JobVertex jobVertex = new JobVertex(jobVertexName);
+ jobVertex.setInvokableClass(NoOpInvokable.class);
+ if (parallelism > 0) {
+ jobVertex.setParallelism(parallelism);
+ }
+ return jobVertex;
+ }
+
+ public JobGraph createJobGraph(boolean withForwardEdge) {
+ final JobVertex source1 = createJobVertex("source1",
SOURCE_PARALLELISM_1);
+ final JobVertex source2 = createJobVertex("source2",
SOURCE_PARALLELISM_2);
+ final JobVertex sink = createJobVertex("sink", -1);
+ sink.connectNewDataSetAsInput(
+ source1, DistributionPattern.POINTWISE,
ResultPartitionType.BLOCKING);
+ sink.connectNewDataSetAsInput(
+ source2, DistributionPattern.POINTWISE,
ResultPartitionType.BLOCKING);
+ if (withForwardEdge) {
+
source1.getProducedDataSets().get(0).getConsumer().setForward(true);
+ }
+ return new JobGraph(new JobID(), "test job", source1, source2, sink);
+ }
+
+ public SchedulerBase createScheduler(JobGraph jobGraph) throws Exception {
+ Configuration configuration = new Configuration();
+ configuration.set(
+ JobManagerOptions.SCHEDULER,
JobManagerOptions.SchedulerType.AdaptiveBatch);
+
+ final AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder
schedulerBuilder =
+ (AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder)
+ new
AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder(
+ jobGraph, mainThreadExecutor)
+ .setJobMasterConfiguration(configuration);
+ schedulerBuilder.setJobVertexParallelismDecider((ignored) -> 10);
+
+ return schedulerBuilder.build();
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
new file mode 100644
index 0000000..116cb24
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
+import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import
org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
+
+/** A utility class to create {@link AdaptiveBatchScheduler} instances for
testing. */
+public class AdaptiveBatchSchedulerTestUtils {
+
+ /** Builder for {@link AdaptiveBatchScheduler}. */
+ public static class AdaptiveBatchSchedulerBuilder
+ extends SchedulerTestingUtils.DefaultSchedulerBuilder {
+
+ private VertexParallelismDecider vertexParallelismDecider = (ignored)
-> 0;
+
+ private int defaultMaxParallelism =
+
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.defaultValue();
+
+ public AdaptiveBatchSchedulerBuilder(
+ JobGraph jobGraph, ComponentMainThreadExecutor
mainThreadExecutor) {
+ super(jobGraph, mainThreadExecutor);
+ setSchedulingStrategyFactory(new
VertexwiseSchedulingStrategy.Factory());
+ }
+
+ public void setJobVertexParallelismDecider(
+ VertexParallelismDecider jobVertexParallelismDecider) {
+ this.vertexParallelismDecider = jobVertexParallelismDecider;
+ }
+
+ public void setDefaultMaxParallelism(int defaultMaxParallelism) {
+ this.defaultMaxParallelism = defaultMaxParallelism;
+ }
+
+ @Override
+ public AdaptiveBatchScheduler build() throws Exception {
+ final ExecutionGraphFactory executionGraphFactory =
+ new DefaultExecutionGraphFactory(
+ jobMasterConfiguration,
+ userCodeLoader,
+ new DefaultExecutionDeploymentTracker(),
+ futureExecutor,
+ ioExecutor,
+ rpcTimeout,
+ jobManagerJobMetricGroup,
+ blobWriter,
+ shuffleMaster,
+ partitionTracker,
+ true);
+
+ return new AdaptiveBatchScheduler(
+ log,
+ jobGraph,
+ ioExecutor,
+ jobMasterConfiguration,
+ componentMainThreadExecutor -> {},
+ delayExecutor,
+ userCodeLoader,
+ checkpointCleaner,
+ checkpointRecoveryFactory,
+ jobManagerJobMetricGroup,
+ schedulingStrategyFactory,
+ failoverStrategyFactory,
+ restartBackoffTimeStrategy,
+ executionVertexOperations,
+ executionVertexVersioner,
+ executionSlotAllocatorFactory,
+ System.currentTimeMillis(),
+ mainThreadExecutor,
+ jobStatusListener,
+ executionGraphFactory,
+ shuffleMaster,
+ rpcTimeout,
+ vertexParallelismDecider,
+ defaultMaxParallelism);
+ }
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtilTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtilTest.java
new file mode 100644
index 0000000..b0d19f1
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtilTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import
org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+
+/** Unit tests for {@link ForwardGroupComputeUtil}. */
+public class ForwardGroupComputeUtilTest extends TestLogger {
+ /**
+ * Tests that the computation of the job graph with isolated vertices
works correctly.
+ *
+ * <pre>
+ * (v1)
+ *
+ * (v2)
+ *
+ * (v3)
+ * </pre>
+ */
+ @Test
+ public void testIsolatedVertices() throws Exception {
+ JobVertex v1 = new JobVertex("v1");
+ JobVertex v2 = new JobVertex("v2");
+ JobVertex v3 = new JobVertex("v3");
+
+ Set<ForwardGroup> groups = computeForwardGroups(v1, v2, v3);
+
+ checkGroupSize(groups, 0);
+ }
+
+ /**
+ * Tests that the computation of the vertices connected with edges which
have various result
+ * partition types works correctly.
+ *
+ * <pre>
+ *
+ * (v1) -> (v2) -> (v3)
+ *
+ * </pre>
+ */
+ @Test
+ public void testVariousResultPartitionTypesBetweenVertices() throws
Exception {
+ testThreeVerticesConnectSequentially(false, true, 1, 2);
+ testThreeVerticesConnectSequentially(false, false, 0);
+ testThreeVerticesConnectSequentially(true, true, 1, 3);
+ }
+
+ private void testThreeVerticesConnectSequentially(
+ boolean isForward1, boolean isForward2, int numOfGroups, int...
groupSizes)
+ throws Exception {
+ JobVertex v1 = new JobVertex("v1");
+ JobVertex v2 = new JobVertex("v2");
+ JobVertex v3 = new JobVertex("v3");
+
+ v2.connectNewDataSetAsInput(
+ v1, DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING);
+ if (isForward1) {
+ v1.getProducedDataSets().get(0).getConsumer().setForward(true);
+ }
+
+ v3.connectNewDataSetAsInput(
+ v2, DistributionPattern.POINTWISE,
ResultPartitionType.BLOCKING);
+
+ if (isForward2) {
+ v2.getProducedDataSets().get(0).getConsumer().setForward(true);
+ }
+
+ Set<ForwardGroup> groups = computeForwardGroups(v1, v2, v3);
+
+ checkGroupSize(groups, numOfGroups, groupSizes);
+ }
+
+ /**
+ * Tests that the computation of the job graph where two upstream vertices
connect with one
+ * downstream vertex works correctly.
+ *
+ * <pre>
+ *
+ * (v1) --
+ * |
+ * --> (v3) -> (v4)
+ * |
+ * (v2) --
+ *
+ * </pre>
+ */
+ @Test
+ public void testTwoInputsMergesIntoOne() throws Exception {
+ JobVertex v1 = new JobVertex("v1");
+ JobVertex v2 = new JobVertex("v2");
+ JobVertex v3 = new JobVertex("v3");
+ JobVertex v4 = new JobVertex("v4");
+
+ v3.connectNewDataSetAsInput(
+ v1, DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING);
+ v1.getProducedDataSets().get(0).getConsumer().setForward(true);
+ v3.connectNewDataSetAsInput(
+ v2, DistributionPattern.POINTWISE,
ResultPartitionType.BLOCKING);
+ v2.getProducedDataSets().get(0).getConsumer().setForward(true);
+ v4.connectNewDataSetAsInput(
+ v3, DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING);
+
+ Set<ForwardGroup> groups = computeForwardGroups(v1, v2, v3, v4);
+
+ checkGroupSize(groups, 1, 3);
+ }
+
+ /**
+ * Tests that the computation of the job graph where one upstream vertex
connect with two
+ * downstream vertices works correctly.
+ *
+ * <pre>
+ *
+ * --> (v3)
+ * |
+ * (v1) -> (v2) --
+ * |
+ * --> (v4)
+ *
+ * </pre>
+ */
+ @Test
+ public void testOneInputSplitsIntoTwo() throws Exception {
+ JobVertex v1 = new JobVertex("v1");
+ JobVertex v2 = new JobVertex("v2");
+ JobVertex v3 = new JobVertex("v3");
+ JobVertex v4 = new JobVertex("v4");
+
+ v2.connectNewDataSetAsInput(
+ v1, DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING);
+ v3.connectNewDataSetAsInput(
+ v2, DistributionPattern.POINTWISE,
ResultPartitionType.BLOCKING);
+ v4.connectNewDataSetAsInput(
+ v2, DistributionPattern.POINTWISE,
ResultPartitionType.BLOCKING);
+ v2.getProducedDataSets().get(0).getConsumer().setForward(true);
+ v2.getProducedDataSets().get(1).getConsumer().setForward(true);
+
+ Set<ForwardGroup> groups = computeForwardGroups(v1, v2, v3, v4);
+
+ checkGroupSize(groups, 1, 3);
+ }
+
+ private static Set<ForwardGroup> computeForwardGroups(JobVertex...
vertices) throws Exception {
+ Arrays.asList(vertices).forEach(vertex ->
vertex.setInvokableClass(NoOpInvokable.class));
+ ExecutionGraph executionGraph = createDynamicGraph(vertices);
+ return new HashSet<>(
+ ForwardGroupComputeUtil.computeForwardGroups(
+ Arrays.asList(vertices),
executionGraph::getJobVertex)
+ .values());
+ }
+
+ private static void checkGroupSize(Set<ForwardGroup> groups, int
numOfGroups, int... sizes) {
+ assertEquals(numOfGroups, groups.size());
+ containsInAnyOrder(
+
groups.stream().map(ForwardGroup::size).collect(Collectors.toList()), sizes);
+ }
+
+ private static DefaultExecutionGraph createDynamicGraph(JobVertex...
vertices)
+ throws Exception {
+
+ TestingDefaultExecutionGraphBuilder builder =
+ TestingDefaultExecutionGraphBuilder.newBuilder()
+ .setJobGraph(new JobGraph(new JobID(), "TestJob",
vertices))
+ .setVertexParallelismStore(
+
AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(
+ Arrays.asList(vertices), 10));
+ return builder.buildDynamicGraph();
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtilsTest.java
new file mode 100644
index 0000000..ba9bdb8
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtilsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobType;
+import
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
+import
org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+
+/** Test for {@link SlotSelectionStrategyUtils}. */
+public class SlotSelectionStrategyUtilsTest extends TestLogger {
+
+ @Test
+ public void
testCreatePreviousAllocationSlotSelectionStrategyForLocalRecoveryStreamingJob()
{
+ final Configuration configuration = new Configuration();
+ configuration.set(CheckpointingOptions.LOCAL_RECOVERY, true);
+
+ final SlotSelectionStrategy slotSelectionStrategy =
+ SlotSelectionStrategyUtils.selectSlotSelectionStrategy(
+ JobType.STREAMING, configuration);
+
+ assertThat(
+ slotSelectionStrategy,
instanceOf(PreviousAllocationSlotSelectionStrategy.class));
+ }
+
+ @Test
+ public void
testCreateLocationPreferenceSlotSelectionStrategyForLocalRecoveryBatchJob() {
+ final Configuration configuration = new Configuration();
+ configuration.set(CheckpointingOptions.LOCAL_RECOVERY, true);
+
+ final SlotSelectionStrategy slotSelectionStrategy =
+ SlotSelectionStrategyUtils.selectSlotSelectionStrategy(
+ JobType.BATCH, configuration);
+
+ assertThat(
+ slotSelectionStrategy,
instanceOf(LocationPreferenceSlotSelectionStrategy.class));
+ }
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index af1f27d..af9994d 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -943,6 +943,7 @@ public class StreamingJobGraphGenerator {
// set strategy name so that web interface can show it.
jobEdge.setShipStrategyName(partitioner.toString());
jobEdge.setBroadcast(partitioner.isBroadcast());
+ jobEdge.setForward(partitioner instanceof ForwardPartitioner);
jobEdge.setDownstreamSubtaskStateMapper(partitioner.getDownstreamSubtaskStateMapper());
jobEdge.setUpstreamSubtaskStateMapper(partitioner.getUpstreamSubtaskStateMapper());
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
new file mode 100644
index 0000000..10d5400
--- /dev/null
+++
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/** IT case for {@link AdaptiveBatchScheduler}. */
+public class AdaptiveBatchSchedulerITCase extends TestLogger {
+
+ private static final int DEFAULT_MAX_PARALLELISM = 4;
+ private static final int SOURCE_PARALLELISM_1 = 2;
+ private static final int SOURCE_PARALLELISM_2 = 8;
+ private static final int NUMBERS_TO_PRODUCE = 10000;
+
+ private static ConcurrentLinkedQueue<Map<Long, Long>> numberCountResults;
+
+ private Map<Long, Long> expectedResult;
+
+ @Before
+ public void setUp() {
+ expectedResult =
+ LongStream.range(0, NUMBERS_TO_PRODUCE)
+ .boxed()
+ .collect(Collectors.toMap(Function.identity(), i ->
2L));
+
+ numberCountResults = new ConcurrentLinkedQueue<>();
+ }
+
+ @Test
+ public void testSchedulingWithUnknownResource() throws Exception {
+ testScheduling(false);
+ }
+
+ @Test
+ public void testSchedulingWithFineGrainedResource() throws Exception {
+ testScheduling(true);
+ }
+
+ public void testScheduling(Boolean isFineGrained) throws Exception {
+ executeJob(isFineGrained);
+
+ Map<Long, Long> numberCountResultMap =
+ numberCountResults.stream()
+ .flatMap(map -> map.entrySet().stream())
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ Map.Entry::getValue,
+ (v1, v2) -> v1 + v2));
+
+ for (int i = 0; i < NUMBERS_TO_PRODUCE; i++) {
+ if (numberCountResultMap.get(i) != expectedResult.get(i)) {
+ System.out.println(i + ": " + numberCountResultMap.get(i));
+ }
+ }
+ assertThat(numberCountResultMap, equalTo(expectedResult));
+ }
+
+ private void executeJob(Boolean isFineGrained) throws Exception {
+ final Configuration configuration = new Configuration();
+ configuration.setString(RestOptions.BIND_PORT, "0");
+ configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L);
+ configuration.set(
+ JobManagerOptions.SCHEDULER,
JobManagerOptions.SchedulerType.AdaptiveBatch);
+ configuration.setInteger(
+ JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM,
+ DEFAULT_MAX_PARALLELISM);
+ configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE,
MemorySize.parse("4kb"));
+ configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, 1);
+
+ if (isFineGrained) {
+
configuration.set(ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT, true);
+
configuration.set(ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING, true);
+ }
+
+ final StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.createLocalEnvironment(configuration);
+ env.setParallelism(-1);
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+ List<SlotSharingGroup> slotSharingGroups = new ArrayList<>();
+
+ for (int i = 0; i < 3; ++i) {
+ SlotSharingGroup group;
+ if (isFineGrained) {
+ group =
+ SlotSharingGroup.newBuilder("group" + i)
+ .setCpuCores(1.0)
+ .setTaskHeapMemory(MemorySize.parse("100m"))
+ .build();
+ } else {
+ group = SlotSharingGroup.newBuilder("group" + i).build();
+ }
+ slotSharingGroups.add(group);
+ }
+
+ final DataStream<Long> source1 =
+ env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
+ .setParallelism(SOURCE_PARALLELISM_1)
+ .name("source1")
+ .slotSharingGroup(slotSharingGroups.get(0));
+
+ final DataStream<Long> source2 =
+ env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
+ .setParallelism(SOURCE_PARALLELISM_2)
+ .name("source2")
+ .slotSharingGroup(slotSharingGroups.get(1));
+
+ source1.union(source2)
+ .rescale()
+ .map(new NumberCounter())
+ .name("map")
+ .slotSharingGroup(slotSharingGroups.get(2));
+
+ env.execute();
+ }
+
+ private static class NumberCounter extends RichMapFunction<Long, Long> {
+
+ private final Map<Long, Long> numberCountResult = new HashMap<>();
+
+ @Override
+ public Long map(Long value) throws Exception {
+ numberCountResult.put(value, numberCountResult.getOrDefault(value,
0L) + 1L);
+
+ return value;
+ }
+
+ @Override
+ public void close() throws Exception {
+ numberCountResults.add(numberCountResult);
+ }
+ }
+}