[
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671441#comment-16671441
]
ASF GitHub Bot commented on FLINK-9635:
---------------------------------------
tillrohrmann closed pull request #6972: [FLINK-9635][scheduling] Avoid task
spread-out in scheduling with loc…
URL: https://github.com/apache/flink/pull/6972
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
index 7cb364d687f..ab682a02dd1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
@@ -25,6 +25,7 @@
import java.util.Collection;
import java.util.Collections;
+import java.util.Set;
/**
* A slot profile describes the profile of a slot into which a task wants to
be scheduled. The profile contains
@@ -47,16 +48,30 @@
/** This contains desired allocation ids of the slot. */
@Nonnull
- private final Collection<AllocationID> priorAllocations;
+ private final Collection<AllocationID> preferredAllocations;
+
+ /** This contains all prior allocation ids from the whole execution
graph. */
+ @Nonnull
+ private final Set<AllocationID> previousExecutionGraphAllocations;
+
+ public SlotProfile(
+ @Nonnull ResourceProfile resourceProfile,
+ @Nonnull Collection<TaskManagerLocation> preferredLocations,
+ @Nonnull Collection<AllocationID> preferredAllocations) {
+
+ this(resourceProfile, preferredLocations, preferredAllocations,
Collections.emptySet());
+ }
public SlotProfile(
@Nonnull ResourceProfile resourceProfile,
@Nonnull Collection<TaskManagerLocation> preferredLocations,
- @Nonnull Collection<AllocationID> priorAllocations) {
+ @Nonnull Collection<AllocationID> preferredAllocations,
+ @Nonnull Set<AllocationID> previousExecutionGraphAllocations) {
this.resourceProfile = resourceProfile;
this.preferredLocations = preferredLocations;
- this.priorAllocations = priorAllocations;
+ this.preferredAllocations = preferredAllocations;
+ this.previousExecutionGraphAllocations =
previousExecutionGraphAllocations;
}
/**
@@ -79,8 +94,18 @@ public ResourceProfile getResourceProfile() {
* Returns the desired allocation ids for the slot.
*/
@Nonnull
- public Collection<AllocationID> getPriorAllocations() {
- return priorAllocations;
+ public Collection<AllocationID> getPreferredAllocations() {
+ return preferredAllocations;
+ }
+
+ /**
+ * Returns a set of all previous allocation ids from the execution
graph.
+ *
+ * This is optional and can be empty if unused.
+ */
+ @Nonnull
+ public Set<AllocationID> getPreviousExecutionGraphAllocations() {
+ return previousExecutionGraphAllocations;
}
/**
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 801f35a41dc..cbf51037b8b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -58,6 +58,7 @@
import org.slf4j.Logger;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -65,6 +66,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -385,7 +387,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore
taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
- LocationPreferenceConstraint.ANY);
+ LocationPreferenceConstraint.ANY,
+ Collections.emptySet());
}
/**
@@ -397,18 +400,22 @@ public void setInitialState(@Nullable
JobManagerTaskRestore taskRestore) {
* @param queued Flag to indicate whether the scheduler may queue this
task if it cannot
* immediately deploy it.
* @param locationPreferenceConstraint constraint for the location
preferences
+ * @param allPreviousExecutionGraphAllocationIds set with all previous
allocation ids in the job graph.
+ * Can be empty if the
allocation ids are not required for scheduling.
* @return Future which is completed once the Execution has been
deployed
*/
public CompletableFuture<Void> scheduleForExecution(
SlotProvider slotProvider,
boolean queued,
- LocationPreferenceConstraint
locationPreferenceConstraint) {
+ LocationPreferenceConstraint
locationPreferenceConstraint,
+ @Nonnull Set<AllocationID>
allPreviousExecutionGraphAllocationIds) {
final Time allocationTimeout =
vertex.getExecutionGraph().getAllocationTimeout();
try {
final CompletableFuture<Execution> allocationFuture =
allocateAndAssignSlotForExecution(
slotProvider,
queued,
locationPreferenceConstraint,
+ allPreviousExecutionGraphAllocationIds,
allocationTimeout);
// IMPORTANT: We have to use the synchronous handle
operation (direct executor) here so
@@ -441,6 +448,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore
taskRestore) {
* @param slotProvider to obtain a new slot from
* @param queued if the allocation can be queued
* @param locationPreferenceConstraint constraint for the location
preferences
+ * @param allPreviousExecutionGraphAllocationIds set with all previous
allocation ids in the job graph.
+ * Can be empty if the
allocation ids are not required for scheduling.
* @param allocationTimeout rpcTimeout for allocating a new slot
* @return Future which is completed with this execution once the slot
has been assigned
* or with an exception if an error occurred.
@@ -450,6 +459,7 @@ public void setInitialState(@Nullable JobManagerTaskRestore
taskRestore) {
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint
locationPreferenceConstraint,
+ @Nonnull Set<AllocationID>
allPreviousExecutionGraphAllocationIds,
Time allocationTimeout) throws
IllegalExecutionStateException {
checkNotNull(slotProvider);
@@ -495,7 +505,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore
taskRestore) {
new SlotProfile(
ResourceProfile.UNKNOWN,
preferredLocations,
-
previousAllocationIDs),
+
previousAllocationIDs,
+
allPreviousExecutionGraphAllocationIds),
allocationTimeout));
// register call back to cancel slot request in case
that the execution gets canceled
@@ -739,7 +750,8 @@ else if (numConsumers == 0) {
consumerVertex.scheduleForExecution(
executionGraph.getSlotProvider(),
executionGraph.isQueuedSchedulingAllowed(),
-
LocationPreferenceConstraint.ANY); // there must be at least one known location
+
LocationPreferenceConstraint.ANY, // there must be at least one known location
+
Collections.emptySet());
} catch (Throwable t) {
consumerVertex.fail(new
IllegalStateException("Could not schedule consumer " +
"vertex
" + consumerVertex, t));
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 0be1ff27420..3b55e009116 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -40,6 +40,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
@@ -60,6 +61,7 @@
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.state.SharedStateRegistry;
@@ -91,6 +93,7 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -902,14 +905,14 @@ public void scheduleForExecution() throws JobException {
private CompletableFuture<Void> scheduleLazy(SlotProvider slotProvider)
{
final ArrayList<CompletableFuture<Void>> schedulingFutures =
new ArrayList<>(numVerticesTotal);
-
// simply take the vertices without inputs.
for (ExecutionJobVertex ejv : verticesInCreationOrder) {
if (ejv.getJobVertex().isInputVertex()) {
final CompletableFuture<Void>
schedulingJobVertexFuture = ejv.scheduleAll(
slotProvider,
allowQueuedScheduling,
- LocationPreferenceConstraint.ALL); //
since it is an input vertex, the input based location preferences should be
empty
+ LocationPreferenceConstraint.ALL,//
since it is an input vertex, the input based location preferences should be
empty
+ Collections.emptySet());
schedulingFutures.add(schedulingJobVertexFuture);
}
@@ -939,6 +942,9 @@ public void scheduleForExecution() throws JobException {
// collecting all the slots may resize and fail in that
operation without slots getting lost
final ArrayList<CompletableFuture<Execution>>
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+ final Set<AllocationID> allPreviousAllocationIds =
+
Collections.unmodifiableSet(computeAllPriorAllocationIdsIfRequiredByScheduling());
+
// allocate the slots (obtain all their futures
for (ExecutionJobVertex ejv : getVerticesTopologically()) {
// these calls are not blocking, they only return
futures
@@ -946,7 +952,8 @@ public void scheduleForExecution() throws JobException {
slotProvider,
queued,
LocationPreferenceConstraint.ALL,
- allocationTimeout);
+ allPreviousAllocationIds,
+ timeout);
allAllocationFutures.addAll(allocationFutures);
}
@@ -1676,6 +1683,35 @@ public void updateAccumulators(AccumulatorSnapshot
accumulatorSnapshot) {
}
}
+ /**
+ * Computes and returns a set with the prior allocation ids from all
execution vertices in the graph.
+ */
+ private Set<AllocationID> computeAllPriorAllocationIds() {
+ HashSet<AllocationID> allPreviousAllocationIds = new
HashSet<>(getNumberOfExecutionJobVertices());
+ for (ExecutionVertex executionVertex :
getAllExecutionVertices()) {
+ AllocationID latestPriorAllocation =
executionVertex.getLatestPriorAllocation();
+ if (latestPriorAllocation != null) {
+
allPreviousAllocationIds.add(latestPriorAllocation);
+ }
+ }
+ return allPreviousAllocationIds;
+ }
+
+ /**
+ * Returns the result of {@link #computeAllPriorAllocationIds()}, but
only if the scheduling really requires it.
+ * Otherwise this method simply returns an empty set.
+ */
+ private Set<AllocationID>
computeAllPriorAllocationIdsIfRequiredByScheduling() {
+ // This is a temporary optimization to avoid computing all
previous allocations if not required
+ // This can go away when we progress with the implementation of
the Scheduler.
+ if (slotProvider instanceof SlotPool.ProviderAndOwner
+ && ((SlotPool.ProviderAndOwner)
slotProvider).requiresPreviousAllocationsForScheduling()) {
+ return computeAllPriorAllocationIds();
+ } else {
+ return Collections.emptySet();
+ }
+ }
+
//
--------------------------------------------------------------------------------------------
// Listeners & Observers
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 6da1e0db892..2ab1d686410 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -34,6 +34,7 @@
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
@@ -54,6 +55,7 @@
import org.slf4j.Logger;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -64,6 +66,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
/**
@@ -474,12 +477,15 @@ public void
connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult>
* @param slotProvider to allocate the slots from
* @param queued if the allocations can be queued
* @param locationPreferenceConstraint constraint for the location
preferences
+ * @param allPreviousExecutionGraphAllocationIds set with all previous
allocation ids in the job graph.
+ * Can be empty if the
allocation ids are not required for scheduling.
* @return Future which is completed once all {@link Execution} could
be deployed
*/
public CompletableFuture<Void> scheduleAll(
SlotProvider slotProvider,
boolean queued,
- LocationPreferenceConstraint
locationPreferenceConstraint) {
+ LocationPreferenceConstraint
locationPreferenceConstraint,
+ @Nonnull Set<AllocationID>
allPreviousExecutionGraphAllocationIds) {
final ExecutionVertex[] vertices = this.taskVertices;
@@ -487,7 +493,11 @@ public void
connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult>
// kick off the tasks
for (ExecutionVertex ev : vertices) {
-
scheduleFutures.add(ev.scheduleForExecution(slotProvider, queued,
locationPreferenceConstraint));
+ scheduleFutures.add(ev.scheduleForExecution(
+ slotProvider,
+ queued,
+ locationPreferenceConstraint,
+ allPreviousExecutionGraphAllocationIds));
}
return FutureUtils.waitForAll(scheduleFutures);
@@ -503,12 +513,14 @@ public void
connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult>
* @param resourceProvider The resource provider from whom the slots
are requested.
* @param queued if the allocation can be queued
* @param locationPreferenceConstraint constraint for the location
preferences
+ * @param allPreviousExecutionGraphAllocationIds the allocation ids of
all previous executions in the execution job graph.
* @param allocationTimeout timeout for allocating the individual slots
*/
public Collection<CompletableFuture<Execution>> allocateResourcesForAll(
SlotProvider resourceProvider,
boolean queued,
LocationPreferenceConstraint
locationPreferenceConstraint,
+ @Nonnull Set<AllocationID>
allPreviousExecutionGraphAllocationIds,
Time allocationTimeout) {
final ExecutionVertex[] vertices = this.taskVertices;
final CompletableFuture<Execution>[] slots = new
CompletableFuture[vertices.length];
@@ -522,6 +534,7 @@ public void
connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult>
resourceProvider,
queued,
locationPreferenceConstraint,
+ allPreviousExecutionGraphAllocationIds,
allocationTimeout);
slots[i] = allocationFuture;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e4228011830..a0747296c53 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -56,6 +56,7 @@
import org.slf4j.Logger;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -617,17 +618,21 @@ public Execution resetForNewExecution(final long
timestamp, final long originati
* @param slotProvider to allocate the slots from
* @param queued if the allocation can be queued
* @param locationPreferenceConstraint constraint for the location
preferences
+ * @param allPreviousExecutionGraphAllocationIds set with all previous
allocation ids in the job graph.
+ * Can be empty if the
allocation ids are not required for scheduling.
* @return Future which is completed once the execution is deployed.
The future
* can also completed exceptionally.
*/
public CompletableFuture<Void> scheduleForExecution(
SlotProvider slotProvider,
boolean queued,
- LocationPreferenceConstraint
locationPreferenceConstraint) {
+ LocationPreferenceConstraint
locationPreferenceConstraint,
+ @Nonnull Set<AllocationID>
allPreviousExecutionGraphAllocationIds) {
return this.currentExecution.scheduleForExecution(
slotProvider,
queued,
- locationPreferenceConstraint);
+ locationPreferenceConstraint,
+ allPreviousExecutionGraphAllocationIds);
}
@VisibleForTesting
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
index 0b00c0e039d..f3ba48e75ab 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph.failover;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -212,6 +213,15 @@ private void restart(long globalModVersionOfFailover) {
connectedExecutionVertexes, false, false);
}
*/
+
+ HashSet<AllocationID>
previousAllocationsInRegion = new HashSet<>(connectedExecutionVertexes.size());
+ for (ExecutionVertex connectedExecutionVertex :
connectedExecutionVertexes) {
+ AllocationID latestPriorAllocation =
connectedExecutionVertex.getLatestPriorAllocation();
+ if (latestPriorAllocation != null) {
+
previousAllocationsInRegion.add(latestPriorAllocation);
+ }
+ }
+
//TODO, use restart strategy to schedule them.
//restart all connected ExecutionVertexes
for (ExecutionVertex ev :
connectedExecutionVertexes) {
@@ -219,7 +229,8 @@ private void restart(long globalModVersionOfFailover) {
ev.scheduleForExecution(
executionGraph.getSlotProvider(),
executionGraph.isQueuedSchedulingAllowed(),
-
LocationPreferenceConstraint.ANY); // some inputs not belonging to the failover
region might have failed concurrently
+
LocationPreferenceConstraint.ANY,
+
previousAllocationsInRegion); // some inputs not belonging to the failover
region might have failed concurrently
}
catch (Throwable e) {
failover(globalModVersionOfFailover);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
index 95dd1f6f9e6..282fd2ccf4e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
index 38781676842..8777edd2fe7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
@@ -18,37 +18,14 @@
package org.apache.flink.runtime.jobmaster;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
/**
* Interface for the context of a {@link LogicalSlot}. This context contains
information
* about the underlying allocated slot and how to communicate with the
TaskManager on which
* it was allocated.
*/
-public interface SlotContext {
- /**
- * Gets the id under which the slot has been allocated on the
TaskManager. This id uniquely identifies the
- * physical slot.
- *
- * @return The id under which the slot has been allocated on the
TaskManager
- */
- AllocationID getAllocationId();
-
- /**
- * Gets the location info of the TaskManager that offers this slot.
- *
- * @return The location info of the TaskManager that offers this slot
- */
- TaskManagerLocation getTaskManagerLocation();
-
- /**
- * Gets the number of the slot.
- *
- * @return The number of the slot on the TaskManager.
- */
- int getPhysicalSlotNumber();
+public interface SlotContext extends SlotInfo {
/**
* Gets the actor gateway that can be used to send messages to the
TaskManager.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotInfo.java
new file mode 100644
index 00000000000..fd33aacfd6c
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotInfo.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/**
+ * Interface that provides basic information in the context of a slot.
+ */
+public interface SlotInfo {
+
+ /**
+ * Gets the id under which the slot has been allocated on the
TaskManager. This id uniquely identifies the
+ * physical slot.
+ *
+ * @return The id under which the slot has been allocated on the
TaskManager
+ */
+ AllocationID getAllocationId();
+
+ /**
+ * Gets the location info of the TaskManager that offers this slot.
+ *
+ * @return The location info of the TaskManager that offers this slot
+ */
+ TaskManagerLocation getTaskManagerLocation();
+
+ /**
+ * Gets the number of the slot.
+ *
+ * @return The number of the slot on the TaskManager.
+ */
+ int getPhysicalSlotNumber();
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
index 75195cd9378..e4e583c09d6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
@@ -92,6 +92,7 @@ public SlotID getSlotId() {
*
* @return The ID under which the slot is allocated
*/
+ @Override
public AllocationID getAllocationId() {
return allocationId;
}
@@ -121,6 +122,7 @@ public ResourceProfile getResourceProfile() {
*
* @return The location info of the TaskManager that offers this slot
*/
+ @Override
public TaskManagerLocation getTaskManagerLocation() {
return taskManagerLocation;
}
@@ -132,6 +134,7 @@ public TaskManagerLocation getTaskManagerLocation() {
*
* @return The actor gateway that can be used to send messages to the
TaskManager.
*/
+ @Override
public TaskManagerGateway getTaskManagerGateway() {
return taskManagerGateway;
}
@@ -142,6 +145,7 @@ public TaskManagerGateway getTaskManagerGateway() {
*
* @return Physical slot number of the allocated slot
*/
+ @Override
public int getPhysicalSlotNumber() {
return physicalSlotNumber;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java
index 25e884c32dd..bc90be65ac5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java
@@ -21,7 +21,7 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import javax.annotation.Nonnull;
@@ -34,6 +34,7 @@
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.stream.Stream;
/**
@@ -54,11 +55,26 @@
@Override
public <IN, OUT> OUT findMatchWithLocality(
@Nonnull SlotProfile slotProfile,
- @Nonnull Stream<IN> candidates,
- @Nonnull Function<IN, SlotContext> contextExtractor,
+ @Nonnull Supplier<Stream<IN>> candidates,
+ @Nonnull Function<IN, SlotInfo> contextExtractor,
@Nonnull Predicate<IN> additionalRequirementsFilter,
@Nonnull BiFunction<IN, Locality, OUT> resultProducer) {
+ return doFindMatchWithLocality(
+ slotProfile,
+ candidates.get(),
+ contextExtractor,
+ additionalRequirementsFilter,
+ resultProducer);
+ }
+
+ @Nullable
+ protected <IN, OUT> OUT doFindMatchWithLocality(
+ @Nonnull SlotProfile slotProfile,
+ @Nonnull Stream<IN> candidates,
+ @Nonnull Function<IN, SlotInfo> contextExtractor,
+ @Nonnull Predicate<IN> additionalRequirementsFilter,
+ @Nonnull BiFunction<IN, Locality, OUT> resultProducer) {
Collection<TaskManagerLocation> locationPreferences =
slotProfile.getPreferredLocations();
// if we have no location preferences, we can only filter by
the additional requirements.
@@ -88,7 +104,7 @@
while (iterator.hasNext()) {
IN candidate = iterator.next();
if (additionalRequirementsFilter.test(candidate)) {
- SlotContext slotContext =
contextExtractor.apply(candidate);
+ SlotInfo slotContext =
contextExtractor.apply(candidate);
// this gets candidate is local-weigh
Integer localWeigh =
preferredResourceIDs.getOrDefault(slotContext.getTaskManagerLocation().getResourceID(),
0);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
index 9b1872ec363..d2193ad7022 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
@@ -21,15 +21,17 @@
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collection;
+import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.stream.Stream;
/**
@@ -48,35 +50,68 @@ private PreviousAllocationSchedulingStrategy() {}
@Override
public <IN, OUT> OUT findMatchWithLocality(
@Nonnull SlotProfile slotProfile,
- @Nonnull Stream<IN> candidates,
- @Nonnull Function<IN, SlotContext> contextExtractor,
+ @Nonnull Supplier<Stream<IN>> candidates,
+ @Nonnull Function<IN, SlotInfo> contextExtractor,
@Nonnull Predicate<IN> additionalRequirementsFilter,
@Nonnull BiFunction<IN, Locality, OUT> resultProducer) {
- Collection<AllocationID> priorAllocations =
slotProfile.getPriorAllocations();
+ Collection<AllocationID> priorAllocations =
slotProfile.getPreferredAllocations();
if (priorAllocations.isEmpty()) {
- return super.findMatchWithLocality(slotProfile,
candidates, contextExtractor, additionalRequirementsFilter, resultProducer);
+ return super.findMatchWithLocality(
+ slotProfile,
+ candidates,
+ contextExtractor,
+ additionalRequirementsFilter,
+ resultProducer);
} else {
- return findPreviousAllocation(candidates,
contextExtractor, additionalRequirementsFilter, resultProducer,
priorAllocations);
+ return findPreviousAllocation(
+ slotProfile,
+ candidates,
+ contextExtractor,
+ additionalRequirementsFilter,
+ resultProducer,
+ priorAllocations);
}
}
@Nullable
private <IN, OUT> OUT findPreviousAllocation(
- @Nonnull Stream<IN> candidates,
- @Nonnull Function<IN, SlotContext> contextExtractor,
+ @Nonnull SlotProfile slotProfile,
+ @Nonnull Supplier<Stream<IN>> candidates,
+ @Nonnull Function<IN, SlotInfo> contextExtractor,
@Nonnull Predicate<IN> additionalRequirementsFilter,
@Nonnull BiFunction<IN, Locality, OUT> resultProducer,
- Collection<AllocationID> priorAllocations) {
+ @Nonnull Collection<AllocationID> priorAllocations) {
+
Predicate<IN> filterByAllocation =
- (candidate) ->
priorAllocations.contains(contextExtractor.apply(candidate).getAllocationId());
+ (IN candidate) ->
priorAllocations.contains(contextExtractor.apply(candidate).getAllocationId());
- return candidates
+ OUT previousAllocationCandidate = candidates
+ .get()
.filter(filterByAllocation.and(additionalRequirementsFilter))
.findFirst()
- .map((result) -> resultProducer.apply(result,
Locality.LOCAL)) // TODO introduce special locality?
+ .map((IN result) -> resultProducer.apply(result,
Locality.LOCAL)) // TODO introduce special locality?
.orElse(null);
+
+ if (previousAllocationCandidate != null) {
+ return previousAllocationCandidate;
+ }
+
+ Set<AllocationID> blackListedAllocationIDs =
slotProfile.getPreviousExecutionGraphAllocations();
+ Stream<IN> candidateStream = candidates.get();
+ if (!blackListedAllocationIDs.isEmpty()) {
+ candidateStream = candidateStream.filter(
+ (IN candidate) ->
!blackListedAllocationIDs.contains(
+
contextExtractor.apply(candidate).getAllocationId()));
+ }
+
+ return doFindMatchWithLocality(
+ slotProfile,
+ candidateStream,
+ contextExtractor,
+ additionalRequirementsFilter,
+ resultProducer);
}
public static PreviousAllocationSchedulingStrategy getInstance() {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java
index fb27a214eff..89b2d05cada 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java
@@ -21,6 +21,7 @@
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -28,6 +29,7 @@
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.stream.Stream;
/**
@@ -53,8 +55,8 @@
@Nullable
<IN, OUT> OUT findMatchWithLocality(
@Nonnull SlotProfile slotProfile,
- @Nonnull Stream<IN> candidates,
- @Nonnull Function<IN, SlotContext> contextExtractor,
+ @Nonnull Supplier<Stream<IN>> candidates,
+ @Nonnull Function<IN, SlotInfo> contextExtractor,
@Nonnull Predicate<IN> additionalRequirementsFilter,
@Nonnull BiFunction<IN, Locality, OUT> resultProducer);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 13f0462455c..fdf0fa9e485 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -54,6 +54,7 @@
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -170,7 +171,9 @@ public SlotPool(
this.pendingRequests = new DualKeyMap<>(16);
this.waitingForResourceManager = new HashMap<>(16);
- this.providerAndOwner = new
ProviderAndOwner(getSelfGateway(SlotPoolGateway.class));
+ this.providerAndOwner = new ProviderAndOwner(
+ getSelfGateway(SlotPoolGateway.class),
+ schedulingStrategy instanceof
PreviousAllocationSchedulingStrategy);
this.slotSharingManagers = new HashMap<>(4);
@@ -325,76 +328,93 @@ public void disconnectResourceManager() {
log.debug("Received slot request [{}] for task: {}",
slotRequestId, task.getTaskToExecute());
- final SlotSharingGroupId slotSharingGroupId =
task.getSlotSharingGroupId();
+ if (task.getSlotSharingGroupId() == null) {
+ return allocateSingleSlot(slotRequestId, slotProfile,
allowQueuedScheduling, allocationTimeout);
+ } else {
+ return allocateSharedSlot(slotRequestId, task,
slotProfile, allowQueuedScheduling, allocationTimeout);
+ }
+ }
- if (slotSharingGroupId != null) {
- // allocate slot with slot sharing
- final SlotSharingManager multiTaskSlotManager =
slotSharingManagers.computeIfAbsent(
- slotSharingGroupId,
- id -> new SlotSharingManager(
- id,
- this,
- providerAndOwner));
-
- final SlotSharingManager.MultiTaskSlotLocality
multiTaskSlotLocality;
-
- try {
- if (task.getCoLocationConstraint() != null) {
- multiTaskSlotLocality =
allocateCoLocatedMultiTaskSlot(
- task.getCoLocationConstraint(),
- multiTaskSlotManager,
- slotProfile,
- allowQueuedScheduling,
- allocationTimeout);
+ private CompletableFuture<LogicalSlot> allocateSingleSlot(
+ SlotRequestId slotRequestId,
+ SlotProfile slotProfile,
+ boolean allowQueuedScheduling,
+ Time allocationTimeout) {
+ // request an allocated slot to assign a single logical slot to
+ CompletableFuture<SlotAndLocality> slotAndLocalityFuture =
requestAllocatedSlot(
+ slotRequestId,
+ slotProfile,
+ allowQueuedScheduling,
+ allocationTimeout);
+
+ return slotAndLocalityFuture.thenApply(
+ (SlotAndLocality slotAndLocality) -> {
+ final AllocatedSlot allocatedSlot =
slotAndLocality.getSlot();
+
+ final SingleLogicalSlot singleTaskSlot = new
SingleLogicalSlot(
+ slotRequestId,
+ allocatedSlot,
+ null,
+ slotAndLocality.getLocality(),
+ providerAndOwner);
+
+ if
(allocatedSlot.tryAssignPayload(singleTaskSlot)) {
+ return singleTaskSlot;
} else {
- multiTaskSlotLocality =
allocateMultiTaskSlot(
- task.getJobVertexId(),
- multiTaskSlotManager,
- slotProfile,
- allowQueuedScheduling,
- allocationTimeout);
+ final FlinkException flinkException =
+ new FlinkException("Could not
assign payload to allocated slot " + allocatedSlot.getAllocationId() + '.');
+ releaseSingleSlot(slotRequestId,
flinkException);
+ throw new
CompletionException(flinkException);
}
- } catch (NoResourceAvailableException
noResourceException) {
- return
FutureUtils.completedExceptionally(noResourceException);
- }
+ });
+ }
- // sanity check
-
Preconditions.checkState(!multiTaskSlotLocality.getMultiTaskSlot().contains(task.getJobVertexId()));
+ private CompletableFuture<LogicalSlot> allocateSharedSlot(
+ SlotRequestId slotRequestId,
+ ScheduledUnit task,
+ SlotProfile slotProfile,
+ boolean allowQueuedScheduling,
+ Time allocationTimeout) {
- final SlotSharingManager.SingleTaskSlot leaf =
multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(
- slotRequestId,
- task.getJobVertexId(),
- multiTaskSlotLocality.getLocality());
+ // allocate slot with slot sharing
+ final SlotSharingManager multiTaskSlotManager =
slotSharingManagers.computeIfAbsent(
+ task.getSlotSharingGroupId(),
+ id -> new SlotSharingManager(
+ id,
+ this,
+ providerAndOwner));
- return leaf.getLogicalSlotFuture();
- } else {
- // request an allocated slot to assign a single logical
slot to
- CompletableFuture<SlotAndLocality>
slotAndLocalityFuture = requestAllocatedSlot(
- slotRequestId,
- slotProfile,
- allowQueuedScheduling,
- allocationTimeout);
+ final SlotSharingManager.MultiTaskSlotLocality
multiTaskSlotLocality;
- return slotAndLocalityFuture.thenApply(
- (SlotAndLocality slotAndLocality) -> {
- final AllocatedSlot allocatedSlot =
slotAndLocality.getSlot();
+ try {
+ if (task.getCoLocationConstraint() != null) {
+ multiTaskSlotLocality =
allocateCoLocatedMultiTaskSlot(
+ task.getCoLocationConstraint(),
+ multiTaskSlotManager,
+ slotProfile,
+ allowQueuedScheduling,
+ allocationTimeout);
+ } else {
+ multiTaskSlotLocality = allocateMultiTaskSlot(
+ task.getJobVertexId(),
+ multiTaskSlotManager,
+ slotProfile,
+ allowQueuedScheduling,
+ allocationTimeout);
+ }
+ } catch (NoResourceAvailableException noResourceException) {
+ return
FutureUtils.completedExceptionally(noResourceException);
+ }
- final SingleLogicalSlot singleTaskSlot
= new SingleLogicalSlot(
- slotRequestId,
- allocatedSlot,
- null,
- slotAndLocality.getLocality(),
- providerAndOwner);
+ // sanity check
+
Preconditions.checkState(!multiTaskSlotLocality.getMultiTaskSlot().contains(task.getJobVertexId()));
- if
(allocatedSlot.tryAssignPayload(singleTaskSlot)) {
- return singleTaskSlot;
- } else {
- final FlinkException
flinkException = new FlinkException("Could not assign payload to allocated slot
" + allocatedSlot.getAllocationId() + '.');
- releaseSlot(slotRequestId,
null, flinkException);
- throw new
CompletionException(flinkException);
- }
- });
- }
+ final SlotSharingManager.SingleTaskSlot leaf =
multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(
+ slotRequestId,
+ task.getJobVertexId(),
+ multiTaskSlotLocality.getLocality());
+
+ return leaf.getLogicalSlotFuture();
}
/**
@@ -438,12 +458,13 @@ public void disconnectResourceManager() {
slotProfile = new SlotProfile(
slotProfile.getResourceProfile(),
Collections.singleton(coLocationConstraint.getLocation()),
- slotProfile.getPriorAllocations());
+ slotProfile.getPreferredAllocations());
}
// get a new multi task slot
final SlotSharingManager.MultiTaskSlotLocality
multiTaskSlotLocality = allocateMultiTaskSlot(
- coLocationConstraint.getGroupId(), multiTaskSlotManager,
+ coLocationConstraint.getGroupId(),
+ multiTaskSlotManager,
slotProfile,
allowQueuedScheduling,
allocationTimeout);
@@ -548,9 +569,8 @@ public void disconnectResourceManager() {
if (multiTaskSlotLocality != null) {
// prefer slot sharing group slots over unused slots
if (polledSlotAndLocality != null) {
- releaseSlot(
+ releaseSingleSlot(
allocatedSlotRequestId,
- null,
new FlinkException("Locality constraint
is not better fulfilled by allocated slot."));
}
return multiTaskSlotLocality;
@@ -587,9 +607,8 @@ public void disconnectResourceManager() {
}
}
} else {
- releaseSlot(
+ releaseSingleSlot(
allocatedSlotRequestId,
- null,
new
FlinkException("Could not find task slot with " + multiTaskSlotRequestId +
'.'));
}
});
@@ -741,41 +760,56 @@ private void stashRequestWaitingForResourceManager(final
PendingRequest pendingR
//
------------------------------------------------------------------------
@Override
- public CompletableFuture<Acknowledge> releaseSlot(SlotRequestId
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable
cause) {
+ public CompletableFuture<Acknowledge> releaseSlot(
+ SlotRequestId slotRequestId,
+ @Nullable SlotSharingGroupId slotSharingGroupId,
+ Throwable cause) {
+
log.debug("Releasing slot [{}] because: {}", slotRequestId,
cause != null ? cause.getMessage() : "null");
if (slotSharingGroupId != null) {
- final SlotSharingManager multiTaskSlotManager =
slotSharingManagers.get(slotSharingGroupId);
+ releaseSharedSlot(slotRequestId, slotSharingGroupId,
cause);
+ } else {
+ releaseSingleSlot(slotRequestId, cause);
+ }
- if (multiTaskSlotManager != null) {
- final SlotSharingManager.TaskSlot taskSlot =
multiTaskSlotManager.getTaskSlot(slotRequestId);
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ }
- if (taskSlot != null) {
- taskSlot.release(cause);
- } else {
- log.debug("Could not find slot [{}] in
slot sharing group {}. Ignoring release slot request.", slotRequestId,
slotSharingGroupId);
- }
+ private void releaseSharedSlot(
+ SlotRequestId slotRequestId,
+ @Nonnull SlotSharingGroupId slotSharingGroupId, Throwable
cause) {
+
+ final SlotSharingManager multiTaskSlotManager =
slotSharingManagers.get(slotSharingGroupId);
+
+ if (multiTaskSlotManager != null) {
+ final SlotSharingManager.TaskSlot taskSlot =
multiTaskSlotManager.getTaskSlot(slotRequestId);
+
+ if (taskSlot != null) {
+ taskSlot.release(cause);
} else {
- log.debug("Could not find slot sharing group
{}. Ignoring release slot request.", slotSharingGroupId);
+ log.debug("Could not find slot [{}] in slot
sharing group {}. Ignoring release slot request.", slotRequestId,
slotSharingGroupId);
}
} else {
- final PendingRequest pendingRequest =
removePendingRequest(slotRequestId);
+ log.debug("Could not find slot sharing group {}.
Ignoring release slot request.", slotSharingGroupId);
+ }
+ }
- if (pendingRequest != null) {
- failPendingRequest(pendingRequest, new
FlinkException("Pending slot request with " + slotRequestId + " has been
released."));
- } else {
- final AllocatedSlot allocatedSlot =
allocatedSlots.remove(slotRequestId);
+ private void releaseSingleSlot(SlotRequestId slotRequestId, Throwable
cause) {
+ final PendingRequest pendingRequest =
removePendingRequest(slotRequestId);
- if (allocatedSlot != null) {
- allocatedSlot.releasePayload(cause);
-
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
- } else {
- log.debug("There is no allocated slot
[{}]. Ignoring the release slot request.", slotRequestId);
- }
+ if (pendingRequest != null) {
+ failPendingRequest(pendingRequest, new
FlinkException("Pending slot request with " + slotRequestId + " has been
released."));
+ } else {
+ final AllocatedSlot allocatedSlot =
allocatedSlots.remove(slotRequestId);
+
+ if (allocatedSlot != null) {
+ allocatedSlot.releasePayload(cause);
+
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
+ } else {
+ log.debug("There is no allocated slot [{}].
Ignoring the release slot request.", slotRequestId);
}
}
-
- return CompletableFuture.completedFuture(Acknowledge.get());
}
/**
@@ -875,17 +909,13 @@ private PendingRequest pollMatchingPendingRequest(final
AllocatedSlot slot) {
validateRunsInMainThread();
List<CompletableFuture<Optional<SlotOffer>>> acceptedSlotOffers
= offers.stream().map(
- offer -> {
- CompletableFuture<Optional<SlotOffer>>
acceptedSlotOffer = offerSlot(
- taskManagerLocation,
- taskManagerGateway,
- offer)
- .thenApply(
- (acceptedSlot) -> acceptedSlot
? Optional.of(offer) : Optional.empty()
- );
-
- return acceptedSlotOffer;
- }
+ offer -> offerSlot(
+ taskManagerLocation,
+ taskManagerGateway,
+ offer)
+ .<Optional<SlotOffer>>thenApply(
+ (acceptedSlot) -> acceptedSlot ?
Optional.of(offer) : Optional.empty()
+ )
).collect(Collectors.toList());
CompletableFuture<Collection<Optional<SlotOffer>>>
optionalSlotOffers = FutureUtils.combineAll(acceptedSlotOffers);
@@ -1359,11 +1389,7 @@ int size() {
@VisibleForTesting
Set<AllocatedSlot> getSlotsForTaskManager(ResourceID
resourceId) {
- if
(allocatedSlotsByTaskManager.containsKey(resourceId)) {
- return
allocatedSlotsByTaskManager.get(resourceId);
- } else {
- return Collections.emptySet();
- }
+ return
allocatedSlotsByTaskManager.getOrDefault(resourceId, Collections.emptySet());
}
}
@@ -1404,18 +1430,12 @@ void add(final AllocatedSlot slot, final long
timestamp) {
final ResourceID resourceID =
slot.getTaskManagerLocation().getResourceID();
final String host =
slot.getTaskManagerLocation().getFQDNHostname();
- Set<AllocatedSlot> slotsForTaskManager =
availableSlotsByTaskManager.get(resourceID);
- if (slotsForTaskManager == null) {
- slotsForTaskManager = new HashSet<>();
-
availableSlotsByTaskManager.put(resourceID, slotsForTaskManager);
- }
+ Set<AllocatedSlot> slotsForTaskManager =
+
availableSlotsByTaskManager.computeIfAbsent(resourceID, k -> new HashSet<>());
slotsForTaskManager.add(slot);
- Set<AllocatedSlot> slotsForHost =
availableSlotsByHost.get(host);
- if (slotsForHost == null) {
- slotsForHost = new HashSet<>();
- availableSlotsByHost.put(host,
slotsForHost);
- }
+ Set<AllocatedSlot> slotsForHost =
+
availableSlotsByHost.computeIfAbsent(host, k -> new HashSet<>());
slotsForHost.add(slot);
}
else {
@@ -1456,7 +1476,7 @@ SlotAndLocality poll(SchedulingStrategy
schedulingStrategy, SlotProfile slotProf
SlotAndLocality matchingSlotAndLocality =
schedulingStrategy.findMatchWithLocality(
slotProfile,
- slotAndTimestamps.stream(),
+ slotAndTimestamps::stream,
SlotAndTimestamp::slot,
(SlotAndTimestamp slot) ->
slot.slot().getResourceProfile().isMatching(slotProfile.getResourceProfile()),
(SlotAndTimestamp slotAndTimestamp, Locality
locality) -> {
@@ -1563,12 +1583,18 @@ void clear() {
* An implementation of the {@link SlotOwner} and {@link SlotProvider}
interfaces
* that delegates methods as RPC calls to the SlotPool's RPC gateway.
*/
- private static class ProviderAndOwner implements SlotOwner,
SlotProvider {
+ public static class ProviderAndOwner implements SlotOwner, SlotProvider
{
private final SlotPoolGateway gateway;
+ private final boolean requiresPreviousAllocationsForScheduling;
- ProviderAndOwner(SlotPoolGateway gateway) {
+ ProviderAndOwner(SlotPoolGateway gateway, boolean
requiresPreviousAllocationsForScheduling) {
this.gateway = gateway;
+ this.requiresPreviousAllocationsForScheduling =
requiresPreviousAllocationsForScheduling;
+ }
+
+ public boolean requiresPreviousAllocationsForScheduling() {
+ return requiresPreviousAllocationsForScheduling;
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index afcd24f1064..80c2dd2d708 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -197,7 +197,7 @@ MultiTaskSlotLocality getResolvedRootSlot(AbstractID
groupId, SchedulingStrategy
Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues
= this.resolvedRootSlots.values();
return matcher.findMatchWithLocality(
slotProfile,
-
resolvedRootSlotsValues.stream().flatMap(Collection::stream),
+ ()
->resolvedRootSlotsValues.stream().flatMap(Collection::stream),
(MultiTaskSlot multiTaskSlot) ->
multiTaskSlot.getSlotContextFuture().join(),
(MultiTaskSlot multiTaskSlot) ->
!multiTaskSlot.contains(groupId),
MultiTaskSlotLocality::of);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
index 6a826aa23e1..ee491499b77 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
@@ -124,18 +124,59 @@ public void
matchPreviousAllocationOverridesPreferredLocation() {
}
@Test
- public void matchPreviousLocationNotAvailable() {
+ public void matchPreviousLocationNotAvailableButByLocality() {
SlotProfile slotProfile = new SlotProfile(resourceProfile,
Collections.singletonList(tml4), Collections.singletonList(aidX));
SlotContext match = runMatching(slotProfile);
- Assert.assertEquals(null, match);
+ Assert.assertEquals(ssc4, match);
+ }
+
+ @Test
+ public void matchPreviousLocationNotAvailableAndAllOthersBlacklisted() {
+ HashSet<AllocationID> blacklisted = new HashSet<>(4);
+ blacklisted.add(aid1);
+ blacklisted.add(aid2);
+ blacklisted.add(aid3);
+ blacklisted.add(aid4);
+ SlotProfile slotProfile = new SlotProfile(resourceProfile,
Collections.singletonList(tml4), Collections.singletonList(aidX), blacklisted);
+ SlotContext match = runMatching(slotProfile);
+
+ // there should be no valid option left and we expect null as
return
+ Assert.assertNull(match);
+ }
+
+ @Test
+ public void matchPreviousLocationNotAvailableAndSomeOthersBlacklisted()
{
+ HashSet<AllocationID> blacklisted = new HashSet<>(3);
+ blacklisted.add(aid1);
+ blacklisted.add(aid3);
+ blacklisted.add(aid4);
+ SlotProfile slotProfile = new SlotProfile(resourceProfile,
Collections.singletonList(tml4), Collections.singletonList(aidX), blacklisted);
+ SlotContext match = runMatching(slotProfile);
+
+ // we expect that the candidate that is not blacklisted is
returned
+ Assert.assertEquals(ssc2, match);
+ }
+
+ @Test
+ public void matchPreviousLocationAvailableButAlsoBlacklisted() {
+ HashSet<AllocationID> blacklisted = new HashSet<>(4);
+ blacklisted.add(aid1);
+ blacklisted.add(aid2);
+ blacklisted.add(aid3);
+ blacklisted.add(aid4);
+ SlotProfile slotProfile = new SlotProfile(resourceProfile,
Collections.singletonList(tml3), Collections.singletonList(aid3), blacklisted);
+ SlotContext match = runMatching(slotProfile);
+
+ // available previous allocation should override blacklisting
+ Assert.assertEquals(ssc3, match);
}
private SlotContext runMatching(SlotProfile slotProfile) {
return schedulingStrategy.findMatchWithLocality(
slotProfile,
- candidates.stream(),
+ candidates::stream,
(candidate) -> candidate,
(candidate) -> true,
(candidate, locality) -> candidate);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 56fd7e12369..74724629eea 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -107,6 +107,7 @@ public void testSlotReleaseOnFailedResourceAssignment()
throws Exception {
slotProvider,
false,
LocationPreferenceConstraint.ALL,
+ Collections.emptySet(),
TestingUtils.infiniteTime());
assertFalse(allocationFuture.isDone());
@@ -156,6 +157,7 @@ public void
testSlotReleaseOnExecutionCancellationInScheduled() throws Exception
slotProvider,
false,
LocationPreferenceConstraint.ALL,
+ Collections.emptySet(),
TestingUtils.infiniteTime());
assertTrue(allocationFuture.isDone());
@@ -205,6 +207,7 @@ public void
testSlotReleaseOnExecutionCancellationInRunning() throws Exception {
slotProvider,
false,
LocationPreferenceConstraint.ALL,
+ Collections.emptySet(),
TestingUtils.infiniteTime());
assertTrue(allocationFuture.isDone());
@@ -254,6 +257,7 @@ public void
testSlotAllocationCancellationWhenExecutionCancelled() throws Except
slotProvider,
false,
LocationPreferenceConstraint.ALL,
+ Collections.emptySet(),
TestingUtils.infiniteTime());
assertThat(allocationFuture.isDone(), is(false));
@@ -357,7 +361,7 @@ public void
testTerminationFutureIsCompletedAfterSlotRelease() throws Exception
ExecutionVertex executionVertex =
executionJobVertex.getTaskVertices()[0];
- executionVertex.scheduleForExecution(slotProvider, false,
LocationPreferenceConstraint.ANY).get();
+ executionVertex.scheduleForExecution(slotProvider, false,
LocationPreferenceConstraint.ANY, Collections.emptySet()).get();
Execution currentExecutionAttempt =
executionVertex.getCurrentExecutionAttempt();
@@ -417,7 +421,7 @@ public void testTaskRestoreStateIsNulledAfterDeployment()
throws Exception {
assertThat(execution.getTaskRestore(), is(notNullValue()));
// schedule the execution vertex and wait for its deployment
- executionVertex.scheduleForExecution(slotProvider, false,
LocationPreferenceConstraint.ANY).get();
+ executionVertex.scheduleForExecution(slotProvider, false,
LocationPreferenceConstraint.ANY, Collections.emptySet()).get();
assertThat(execution.getTaskRestore(), is(nullValue()));
}
@@ -479,7 +483,8 @@ public void testEagerSchedulingFailureReturnsSlot() throws
Exception {
final CompletableFuture<Void> schedulingFuture =
execution.scheduleForExecution(
slotProvider,
false,
- LocationPreferenceConstraint.ANY);
+ LocationPreferenceConstraint.ANY,
+ Collections.emptySet());
try {
schedulingFuture.get();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index cd613f0f50a..41894767b6a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -40,6 +40,7 @@
import org.junit.Test;
import java.io.IOException;
+import java.util.Collections;
import scala.concurrent.ExecutionContext;
@@ -454,7 +455,7 @@ public void testScheduleOrDeployAfterCancel() {
// it can occur as the result of races
{
Scheduler scheduler = mock(Scheduler.class);
- vertex.scheduleForExecution(scheduler, false,
LocationPreferenceConstraint.ALL);
+ vertex.scheduleForExecution(scheduler, false,
LocationPreferenceConstraint.ALL, Collections.emptySet());
assertEquals(ExecutionState.CANCELED,
vertex.getExecutionState());
}
@@ -493,7 +494,7 @@ public void testActionsWhileCancelling() {
setVertexState(vertex,
ExecutionState.CANCELING);
Scheduler scheduler = mock(Scheduler.class);
- vertex.scheduleForExecution(scheduler, false,
LocationPreferenceConstraint.ALL);
+ vertex.scheduleForExecution(scheduler, false,
LocationPreferenceConstraint.ALL, Collections.emptySet());
}
catch (Exception e) {
fail("should not throw an exception");
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 51d1827e668..6bfcb7ff5d0 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -36,6 +36,7 @@
import org.junit.Test;
+import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
@@ -71,7 +72,7 @@ public void testSlotReleasedWhenScheduledImmediately() {
assertEquals(ExecutionState.CREATED,
vertex.getExecutionState());
// try to deploy to the slot
- vertex.scheduleForExecution(scheduler, false,
LocationPreferenceConstraint.ALL);
+ vertex.scheduleForExecution(scheduler, false,
LocationPreferenceConstraint.ALL, Collections.emptySet());
// will have failed
assertEquals(ExecutionState.FAILED,
vertex.getExecutionState());
@@ -103,7 +104,7 @@ public void testSlotReleasedWhenScheduledQueued() {
assertEquals(ExecutionState.CREATED,
vertex.getExecutionState());
// try to deploy to the slot
- vertex.scheduleForExecution(scheduler, true,
LocationPreferenceConstraint.ALL);
+ vertex.scheduleForExecution(scheduler, true,
LocationPreferenceConstraint.ALL, Collections.emptySet());
// future has not yet a slot
assertEquals(ExecutionState.SCHEDULED,
vertex.getExecutionState());
@@ -138,7 +139,7 @@ public void testScheduleToDeploying() {
assertEquals(ExecutionState.CREATED,
vertex.getExecutionState());
// try to deploy to the slot
- vertex.scheduleForExecution(scheduler, false,
LocationPreferenceConstraint.ALL);
+ vertex.scheduleForExecution(scheduler, false,
LocationPreferenceConstraint.ALL, Collections.emptySet());
assertEquals(ExecutionState.DEPLOYING,
vertex.getExecutionState());
}
catch (Exception e) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index c411393990c..a53debfd0b8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -49,6 +49,7 @@
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -157,7 +158,7 @@ public void testMultiRegionsFailover() throws Exception {
assertEquals(JobStatus.RUNNING,
strategy.getFailoverRegion(ev11).getState());
- ev21.scheduleForExecution(slotProvider, true,
LocationPreferenceConstraint.ALL);
+ ev21.scheduleForExecution(slotProvider, true,
LocationPreferenceConstraint.ALL, Collections.emptySet());
ev21.getCurrentExecutionAttempt().fail(new Exception("New
fail"));
assertEquals(JobStatus.CANCELLING,
strategy.getFailoverRegion(ev11).getState());
assertEquals(JobStatus.RUNNING,
strategy.getFailoverRegion(ev22).getState());
@@ -170,7 +171,7 @@ public void testMultiRegionsFailover() throws Exception {
ev11.getCurrentExecutionAttempt().markFinished();
ev21.getCurrentExecutionAttempt().markFinished();
- ev22.scheduleForExecution(slotProvider, true,
LocationPreferenceConstraint.ALL);
+ ev22.scheduleForExecution(slotProvider, true,
LocationPreferenceConstraint.ALL, Collections.emptySet());
ev22.getCurrentExecutionAttempt().markFinished();
assertEquals(JobStatus.RUNNING,
strategy.getFailoverRegion(ev11).getState());
assertEquals(JobStatus.RUNNING,
strategy.getFailoverRegion(ev22).getState());
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
index 0344d71879b..fb28c2c75a4 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
@@ -41,7 +41,6 @@
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
-import org.junit.Ignore;
import org.junit.Test;
import javax.annotation.Nonnull;
@@ -50,6 +49,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
+import static
org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
@@ -72,13 +72,26 @@ public void testDisablingLocalRecovery() throws Exception {
/**
* Tests that if local recovery is enabled we won't spread
- * out tasks when recovering.
+ * out tasks when recovering for global failover.
*/
@Test
- @Ignore("The test should not pass until FLINK-9635 has been fixed")
- public void testLocalRecovery() throws Exception {
+ public void testLocalRecoveryFull() throws Exception {
+ testLocalRecoveryInternal("full");
+ }
+
+ /**
+ * Tests that if local recovery is enabled we won't spread
+ * out tasks when recovering for regional failover.
+ */
+ @Test
+ public void testLocalRecoveryRegion() throws Exception {
+ testLocalRecoveryInternal("region");
+ }
+
+ private void testLocalRecoveryInternal(String failoverStrategyValue)
throws Exception {
final Configuration configuration = new Configuration();
configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY,
true);
+ configuration.setString(EXECUTION_FAILOVER_STRATEGY.key(),
failoverStrategyValue);
executeSchedulingTest(configuration);
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Local recovery scheduling can cause spread out of tasks
> -------------------------------------------------------
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
> Issue Type: Bug
> Components: Distributed Coordination
> Affects Versions: 1.5.0, 1.6.2
> Reporter: Till Rohrmann
> Assignee: Stefan Richter
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such
> that it tries to be rescheduled to its previous location. In order to not
> occupy slots which have state of other tasks cached, the strategy will
> request a new slot if the old slot identified by the previous allocation id
> is no longer present. This also applies to newly allocated slots because
> there is no distinction between new or already used. This behaviour can cause
> that every tasks gets deployed to its own slot if the {{SlotPool}} has
> released all slots in the meantime, for example. The consequence could be
> that a job can no longer be executed after a failure because it needs more
> slots than before.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)