[
https://issues.apache.org/jira/browse/FLINK-9912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617553#comment-16617553
]
ASF GitHub Bot commented on FLINK-9912:
---------------------------------------
asfgit closed pull request #6394: [FLINK-9912][JM] Release TaskExecutors if
they have no slots registered at SlotPool
URL: https://github.com/apache/flink/pull/6394
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
new file mode 100644
index 00000000000..4ec75c51a84
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/**
+ * Serializable {@link Optional}.
+ */
+public final class SerializableOptional<T extends Serializable> implements
Serializable {
+ private static final long serialVersionUID = -3312769593551775940L;
+
+ private static final SerializableOptional<?> EMPTY = new
SerializableOptional<>(null);
+
+ @Nullable
+ private final T value;
+
+ private SerializableOptional(@Nullable T value) {
+ this.value = value;
+ }
+
+ public T get() {
+ if (value == null) {
+ throw new NoSuchElementException("No value present");
+ }
+ return value;
+ }
+
+ public boolean isPresent() {
+ return value != null;
+ }
+
+ public void ifPresent(Consumer<? super T> consumer) {
+ if (value != null) {
+ consumer.accept(value);
+ }
+ }
+
+ public static <T extends Serializable> SerializableOptional<T>
of(@Nonnull T value) {
+ return new SerializableOptional<>(value);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T extends Serializable> SerializableOptional<T> empty() {
+ return (SerializableOptional<T>) EMPTY;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 736984e88e7..21e06af30d6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -103,6 +103,7 @@
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
@@ -833,13 +834,25 @@ public void failSlot(
final Exception cause) {
if (registeredTaskManagers.containsKey(taskManagerId)) {
- slotPoolGateway.failAllocation(allocationId, cause);
+ internalFailAllocation(allocationId, cause);
} else {
log.warn("Cannot fail slot " + allocationId + " because
the TaskManager " +
taskManagerId + " is unknown.");
}
}
+ private void internalFailAllocation(AllocationID allocationId,
Exception cause) {
+ final CompletableFuture<SerializableOptional<ResourceID>>
emptyTaskExecutorFuture = slotPoolGateway.failAllocation(allocationId, cause);
+
+ emptyTaskExecutorFuture.thenAcceptAsync(
+ resourceIdOptional ->
resourceIdOptional.ifPresent(this::releaseEmptyTaskManager),
+ getMainThreadExecutor());
+ }
+
+ private CompletableFuture<Acknowledge>
releaseEmptyTaskManager(ResourceID resourceId) {
+ return disconnectTaskManager(resourceId, new
FlinkException(String.format("No more slots registered at JobMaster %s.",
resourceId)));
+ }
+
@Override
public CompletableFuture<RegistrationResponse> registerTaskManager(
final String taskManagerRpcAddress,
@@ -982,7 +995,7 @@ private void startCheckpointScheduler(final
CheckpointCoordinator checkpointCoor
@Override
public void notifyAllocationFailure(AllocationID allocationID,
Exception cause) {
- slotPoolGateway.failAllocation(allocationID, cause);
+ internalFailAllocation(allocationID, cause);
}
//----------------------------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 13f0462455c..b53ee93e643 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -50,6 +50,7 @@
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.clock.Clock;
import org.apache.flink.runtime.util.clock.SystemClock;
+import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
@@ -1001,32 +1002,50 @@ private PendingRequest pollMatchingPendingRequest(final
AllocatedSlot slot) {
* and decided to take it back.
*
* @param allocationID Represents the allocation which should be failed
- * @param cause The cause of the failure
+ * @param cause The cause of the failure
+ * @return Optional task executor if it has no more slots registered
*/
@Override
- public void failAllocation(final AllocationID allocationID, final
Exception cause) {
+ public CompletableFuture<SerializableOptional<ResourceID>>
failAllocation(final AllocationID allocationID, final Exception cause) {
final PendingRequest pendingRequest =
pendingRequests.removeKeyB(allocationID);
if (pendingRequest != null) {
// request was still pending
failPendingRequest(pendingRequest, cause);
- }
- else if (availableSlots.tryRemove(allocationID)) {
- log.debug("Failed available slot [{}].", allocationID,
cause);
+ return
CompletableFuture.completedFuture(SerializableOptional.empty());
}
else {
- AllocatedSlot allocatedSlot =
allocatedSlots.remove(allocationID);
- if (allocatedSlot != null) {
- // release the slot.
- // since it is not in 'allocatedSlots' any
more, it will be dropped o return'
- allocatedSlot.releasePayload(cause);
- }
- else {
- log.trace("Outdated request to fail slot
[{}].", allocationID, cause);
- }
+ return tryFailingAllocatedSlot(allocationID, cause);
}
+
// TODO: add some unit tests when the previous two are ready,
the allocation may failed at any phase
}
+ private CompletableFuture<SerializableOptional<ResourceID>>
tryFailingAllocatedSlot(AllocationID allocationID, Exception cause) {
+ AllocatedSlot allocatedSlot =
availableSlots.tryRemove(allocationID);
+
+ if (allocatedSlot == null) {
+ allocatedSlot = allocatedSlots.remove(allocationID);
+ }
+
+ if (allocatedSlot != null) {
+ log.debug("Failed allocated slot [{}]: {}",
allocationID, cause.getMessage());
+
+ // notify TaskExecutor about the failure
+
allocatedSlot.getTaskManagerGateway().freeSlot(allocationID, cause, rpcTimeout);
+ // release the slot.
+ // since it is not in 'allocatedSlots' any more, it
will be dropped o return'
+ allocatedSlot.releasePayload(cause);
+
+ final ResourceID taskManagerId =
allocatedSlot.getTaskManagerId();
+
+ if (!availableSlots.containsTaskManager(taskManagerId)
&& !allocatedSlots.containResource(taskManagerId)) {
+ return
CompletableFuture.completedFuture(SerializableOptional.of(taskManagerId));
+ }
+ }
+
+ return
CompletableFuture.completedFuture(SerializableOptional.empty());
+ }
+
//
------------------------------------------------------------------------
// Resource
//
------------------------------------------------------------------------
@@ -1107,7 +1126,7 @@ private void checkIdleSlot() {
for (AllocatedSlot expiredSlot : expiredSlots) {
final AllocationID allocationID =
expiredSlot.getAllocationId();
- if (availableSlots.tryRemove(allocationID)) {
+ if (availableSlots.tryRemove(allocationID) != null) {
log.info("Releasing idle slot [{}].",
allocationID);
final CompletableFuture<Acknowledge>
freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
@@ -1502,7 +1521,7 @@ SlotAndLocality poll(SchedulingStrategy
schedulingStrategy, SlotProfile slotProf
}
}
- boolean tryRemove(AllocationID slotId) {
+ AllocatedSlot tryRemove(AllocationID slotId) {
final SlotAndTimestamp sat =
availableSlots.remove(slotId);
if (sat != null) {
final AllocatedSlot slot = sat.slot();
@@ -1522,15 +1541,15 @@ boolean tryRemove(AllocationID slotId) {
availableSlotsByHost.remove(host);
}
- return true;
+ return slot;
}
else {
- return false;
+ return null;
}
}
private void remove(AllocationID slotId) throws
IllegalStateException {
- if (!tryRemove(slotId)) {
+ if (tryRemove(slotId) == null) {
throw new IllegalStateException("slot not
contained");
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
index 34d9c7ff601..3e546ff3674 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
@@ -34,6 +34,7 @@
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.types.SerializableOptional;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
@@ -126,8 +127,9 @@
*
* @param allocationID identifying the slot which is being failed
* @param cause of the failure
+ * @return An optional task executor id if this task executor has no
more slots registered
*/
- void failAllocation(AllocationID allocationID, Exception cause);
+ CompletableFuture<SerializableOptional<ResourceID>>
failAllocation(AllocationID allocationID, Exception cause);
//
------------------------------------------------------------------------
// allocating and disposing slots
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index afcd24f1064..ef288a26469 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -36,7 +36,6 @@
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
import java.util.AbstractCollection;
import java.util.Collection;
@@ -82,9 +81,6 @@
private static final Logger LOG =
LoggerFactory.getLogger(SlotSharingManager.class);
- /** Lock for the internal data structures. */
- private final Object lock = new Object();
-
private final SlotSharingGroupId slotSharingGroupId;
/** Actions to release allocated slots after a complete multi task slot
hierarchy has been released. */
@@ -96,11 +92,9 @@
private final Map<SlotRequestId, TaskSlot> allTaskSlots;
/** Root nodes which have not been completed because the allocated slot
is still pending. */
- @GuardedBy("lock")
private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;
/** Root nodes which have been completed (the underlying allocated slot
has been assigned). */
- @GuardedBy("lock")
private final Map<TaskManagerLocation, Set<MultiTaskSlot>>
resolvedRootSlots;
SlotSharingManager(
@@ -152,27 +146,23 @@ MultiTaskSlot createRootSlot(
allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
- synchronized (lock) {
- unresolvedRootSlots.put(slotRequestId,
rootMultiTaskSlot);
- }
+ unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
// add the root node to the set of resolved root nodes once the
SlotContext future has
// been completed and we know the slot's TaskManagerLocation
slotContextFuture.whenComplete(
(SlotContext slotContext, Throwable throwable) -> {
if (slotContext != null) {
- synchronized (lock) {
- final MultiTaskSlot
resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
+ final MultiTaskSlot resolvedRootNode =
unresolvedRootSlots.remove(slotRequestId);
- if (resolvedRootNode != null) {
- LOG.trace("Fulfill
multi task slot [{}] with slot [{}].", slotRequestId,
slotContext.getAllocationId());
+ if (resolvedRootNode != null) {
+ LOG.trace("Fulfill multi task
slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
- final
Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
-
slotContext.getTaskManagerLocation(),
-
taskManagerLocation -> new HashSet<>(4));
+ final Set<MultiTaskSlot>
innerCollection = resolvedRootSlots.computeIfAbsent(
+
slotContext.getTaskManagerLocation(),
+ taskManagerLocation ->
new HashSet<>(4));
-
innerCollection.add(resolvedRootNode);
- }
+
innerCollection.add(resolvedRootNode);
}
} else {
rootMultiTaskSlot.release(throwable);
@@ -193,15 +183,13 @@ MultiTaskSlot createRootSlot(
*/
@Nullable
MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId,
SchedulingStrategy matcher, SlotProfile slotProfile) {
- synchronized (lock) {
- Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues
= this.resolvedRootSlots.values();
- return matcher.findMatchWithLocality(
- slotProfile,
-
resolvedRootSlotsValues.stream().flatMap(Collection::stream),
- (MultiTaskSlot multiTaskSlot) ->
multiTaskSlot.getSlotContextFuture().join(),
- (MultiTaskSlot multiTaskSlot) ->
!multiTaskSlot.contains(groupId),
- MultiTaskSlotLocality::of);
- }
+ Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues =
this.resolvedRootSlots.values();
+ return matcher.findMatchWithLocality(
+ slotProfile,
+
resolvedRootSlotsValues.stream().flatMap(Collection::stream),
+ (MultiTaskSlot multiTaskSlot) ->
multiTaskSlot.getSlotContextFuture().join(),
+ (MultiTaskSlot multiTaskSlot) ->
!multiTaskSlot.contains(groupId),
+ MultiTaskSlotLocality::of);
}
/**
@@ -213,11 +201,9 @@ MultiTaskSlotLocality getResolvedRootSlot(AbstractID
groupId, SchedulingStrategy
*/
@Nullable
MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
- synchronized (lock) {
- for (MultiTaskSlot multiTaskSlot :
unresolvedRootSlots.values()) {
- if (!multiTaskSlot.contains(groupId)) {
- return multiTaskSlot;
- }
+ for (MultiTaskSlot multiTaskSlot :
unresolvedRootSlots.values()) {
+ if (!multiTaskSlot.contains(groupId)) {
+ return multiTaskSlot;
}
}
@@ -228,11 +214,9 @@ MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
public String toString() {
final StringBuilder builder = new
StringBuilder("{\n\tgroupId=").append(slotSharingGroupId).append('\n');
- synchronized (lock) {
-
builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n');
-
builder.append("\tresolved=").append(resolvedRootSlots).append('\n');
-
builder.append("\tall=").append(allTaskSlots).append('\n');
- }
+
builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n');
+
builder.append("\tresolved=").append(resolvedRootSlots).append('\n');
+ builder.append("\tall=").append(allTaskSlots).append('\n');
return builder.append('}').toString();
}
@@ -479,26 +463,20 @@ public void release(Throwable cause) {
parent.releaseChild(getGroupId());
} else if (allTaskSlots.remove(getSlotRequestId()) !=
null) {
// we are the root node --> remove the root
node from the list of task slots
+ final MultiTaskSlot unresolvedRootSlot =
unresolvedRootSlots.remove(getSlotRequestId());
- if (!slotContextFuture.isDone() ||
slotContextFuture.isCompletedExceptionally()) {
- synchronized (lock) {
- // the root node should still
be unresolved
-
unresolvedRootSlots.remove(getSlotRequestId());
- }
- } else {
+ if (unresolvedRootSlot == null) {
// the root node should be resolved -->
we can access the slot context
final SlotContext slotContext =
slotContextFuture.getNow(null);
if (slotContext != null) {
- synchronized (lock) {
- final
Set<MultiTaskSlot> multiTaskSlots =
resolvedRootSlots.get(slotContext.getTaskManagerLocation());
+ final Set<MultiTaskSlot>
multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());
- if (multiTaskSlots !=
null) {
-
multiTaskSlots.remove(this);
+ if (multiTaskSlots != null) {
+
multiTaskSlots.remove(this);
- if
(multiTaskSlots.isEmpty()) {
-
resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
- }
+ if
(multiTaskSlots.isEmpty()) {
+
resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
}
}
}
@@ -637,9 +615,7 @@ public String toString() {
@VisibleForTesting
Collection<MultiTaskSlot> getUnresolvedRootSlots() {
- synchronized (lock) {
- return unresolvedRootSlots.values();
- }
+ return unresolvedRootSlots.values();
}
/**
@@ -649,19 +625,15 @@ public String toString() {
@Override
public Iterator<MultiTaskSlot> iterator() {
- synchronized (lock) {
- return new
ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
- }
+ return new
ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
}
@Override
public int size() {
int numberResolvedMultiTaskSlots = 0;
- synchronized (lock) {
- for (Set<MultiTaskSlot> multiTaskSlots :
resolvedRootSlots.values()) {
- numberResolvedMultiTaskSlots +=
multiTaskSlots.size();
- }
+ for (Set<MultiTaskSlot> multiTaskSlots :
resolvedRootSlots.values()) {
+ numberResolvedMultiTaskSlots +=
multiTaskSlots.size();
}
return numberResolvedMultiTaskSlots;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 5c62a737096..e53b48021ee 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -34,9 +34,12 @@
import org.apache.flink.runtime.messages.StackTrace;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+import javax.annotation.Nonnull;
+
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -54,6 +57,9 @@
private volatile BiFunction<AllocationID, Throwable,
CompletableFuture<Acknowledge>> freeSlotFunction;
+ @Nonnull
+ private volatile BiConsumer<InstanceID, Exception>
disconnectFromJobManagerConsumer = (ignoredA, ignoredB) -> {};
+
public SimpleAckingTaskManagerGateway() {
optSubmitConsumer = Optional.empty();
optCancelConsumer = Optional.empty();
@@ -71,13 +77,19 @@ public void setFreeSlotFunction(BiFunction<AllocationID,
Throwable, CompletableF
this.freeSlotFunction = freeSlotFunction;
}
+ public void setDisconnectFromJobManagerConsumer(@Nonnull
BiConsumer<InstanceID, Exception> disconnectFromJobManagerConsumer) {
+ this.disconnectFromJobManagerConsumer =
disconnectFromJobManagerConsumer;
+ }
+
@Override
public String getAddress() {
return address;
}
@Override
- public void disconnectFromJobManager(InstanceID instanceId, Exception
cause) {}
+ public void disconnectFromJobManager(InstanceID instanceId, Exception
cause) {
+ disconnectFromJobManagerConsumer.accept(instanceId, cause);
+ }
@Override
public void stopCluster(ApplicationStatus applicationStatus, String
message) {}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 9a2bc97b62b..462b1d16da9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -550,20 +550,6 @@ public void testSlotRequestTimeoutWhenNoSlotOffering()
throws Exception {
}
}
- private JobGraph createSingleVertexJobWithRestartStrategy() throws
IOException {
- final JobVertex jobVertex = new JobVertex("Test vertex");
- jobVertex.setInvokableClass(NoOpInvokable.class);
-
- final ExecutionConfig executionConfig = new ExecutionConfig();
-
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
0L));
-
- final JobGraph jobGraph = new JobGraph(jobVertex);
- jobGraph.setAllowQueuedScheduling(true);
- jobGraph.setExecutionConfig(executionConfig);
-
- return jobGraph;
- }
-
/**
* Tests that we can close an unestablished ResourceManager connection.
*/
@@ -975,6 +961,72 @@ public void testTriggerSavepointTimeout() throws Exception
{
}
}
+ /**
+ * Tests that the TaskExecutor is released if all of its slots have
been freed.
+ */
+ @Test
+ public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws
Exception {
+ final JobManagerSharedServices jobManagerSharedServices = new
TestingJobManagerSharedServicesBuilder().build();
+
+ final JobGraph jobGraph =
createSingleVertexJobWithRestartStrategy();
+
+ final JobMaster jobMaster = createJobMaster(
+ configuration,
+ jobGraph,
+ haServices,
+ jobManagerSharedServices,
+ heartbeatServices);
+
+ final TestingResourceManagerGateway
testingResourceManagerGateway = new TestingResourceManagerGateway();
+
rpcService.registerGateway(testingResourceManagerGateway.getAddress(),
testingResourceManagerGateway);
+
rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(),
testingResourceManagerGateway.getFencingToken().toUUID());
+
+ final CompletableFuture<AllocationID> allocationIdFuture = new
CompletableFuture<>();
+
+ testingResourceManagerGateway.setRequestSlotConsumer(
+ slotRequest ->
allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+ final CompletableFuture<JobID> disconnectTaskExecutorFuture =
new CompletableFuture<>();
+ final CompletableFuture<AllocationID> freedSlotFuture = new
CompletableFuture<>();
+ final TestingTaskExecutorGateway testingTaskExecutorGateway =
new TestingTaskExecutorGatewayBuilder()
+ .setFreeSlotFunction(
+ (allocationID, throwable) -> {
+ freedSlotFuture.complete(allocationID);
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ })
+ .setDisconnectJobManagerConsumer((jobID, throwable) ->
disconnectTaskExecutorFuture.complete(jobID))
+ .createTestingTaskExecutorGateway();
+ final TaskManagerLocation taskManagerLocation = new
LocalTaskManagerLocation();
+
rpcService.registerGateway(testingTaskExecutorGateway.getAddress(),
testingTaskExecutorGateway);
+
+ try {
+ jobMaster.start(jobMasterId, testingTimeout).get();
+
+ final JobMasterGateway jobMasterGateway =
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+ final AllocationID allocationId =
allocationIdFuture.get();
+
+
jobMasterGateway.registerTaskManager(testingTaskExecutorGateway.getAddress(),
taskManagerLocation, testingTimeout).get();
+
+ final SlotOffer slotOffer = new SlotOffer(allocationId,
0, ResourceProfile.UNKNOWN);
+ final CompletableFuture<Collection<SlotOffer>>
acceptedSlotOffers =
jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(),
Collections.singleton(slotOffer), testingTimeout);
+
+ final Collection<SlotOffer> slotOffers =
acceptedSlotOffers.get();
+
+ // check that we accepted the offered slot
+ assertThat(slotOffers, hasSize(1));
+
+ // now fail the allocation and check that we close the
connection to the TaskExecutor
+ jobMasterGateway.notifyAllocationFailure(allocationId,
new FlinkException("Fail alloction test exception"));
+
+ // we should free the slot and then disconnect from the
TaskExecutor because we use no longer slots from it
+ assertThat(freedSlotFuture.get(),
equalTo(allocationId));
+ assertThat(disconnectTaskExecutorFuture.get(),
equalTo(jobGraph.getJobID()));
+ } finally {
+ RpcUtils.terminateRpcEndpoint(jobMaster,
testingTimeout);
+ }
+ }
+
private JobGraph producerConsumerJobGraph() {
final JobVertex producer = new JobVertex("Producer");
producer.setInvokableClass(NoOpInvokable.class);
@@ -1064,6 +1116,20 @@ private JobMaster createJobMaster(
JobMasterTest.class.getClassLoader());
}
+ private JobGraph createSingleVertexJobWithRestartStrategy() throws
IOException {
+ final JobVertex jobVertex = new JobVertex("Test vertex");
+ jobVertex.setInvokableClass(NoOpInvokable.class);
+
+ final ExecutionConfig executionConfig = new ExecutionConfig();
+
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
0L));
+
+ final JobGraph jobGraph = new JobGraph(jobVertex);
+ jobGraph.setAllowQueuedScheduling(true);
+ jobGraph.setExecutionConfig(executionConfig);
+
+ return jobGraph;
+ }
+
/**
* No op implementation of {@link OnCompletionActions}.
*/
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index 9815cb289da..3a9925c4f20 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -44,6 +45,7 @@
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.clock.ManualClock;
+import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
@@ -59,7 +61,9 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
@@ -69,6 +73,8 @@
import static
org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -692,12 +698,7 @@ public void testReleasingIdleSlotFailed() throws Exception
{
slotPool.triggerCheckIdleSlot();
- CompletableFuture<LogicalSlot> allocatedSlotFuture =
slotPoolGateway.allocateSlot(
- new SlotRequestId(),
- new DummyScheduledUnit(),
- SlotProfile.noRequirements(),
- true,
- timeout);
+ CompletableFuture<LogicalSlot> allocatedSlotFuture =
allocateSlot(slotPoolGateway, new SlotRequestId());
// wait until the slot has been fulfilled with the
previously idling slot
final LogicalSlot logicalSlot =
allocatedSlotFuture.get();
@@ -712,12 +713,7 @@ public void testReleasingIdleSlotFailed() throws Exception
{
slotPool.triggerCheckIdleSlot();
// request a new slot after the idling slot has been
released
- allocatedSlotFuture = slotPoolGateway.allocateSlot(
- new SlotRequestId(),
- new DummyScheduledUnit(),
- SlotProfile.noRequirements(),
- true,
- timeout);
+ allocatedSlotFuture = allocateSlot(slotPoolGateway, new
SlotRequestId());
// release the TaskExecutor before we get a response
from the slot releasing
slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID(),
null).get();
@@ -739,6 +735,114 @@ public void testReleasingIdleSlotFailed() throws
Exception {
}
}
+ /**
+ * Tests that failed slots are freed on the {@link TaskExecutor}.
+ */
+ @Test
+ public void testFreeFailedSlots() throws Exception {
+ final SlotPool slotPool = new SlotPool(rpcService, jobId,
LocationPreferenceSchedulingStrategy.getInstance());
+
+ try {
+ final int parallelism = 5;
+ final ArrayBlockingQueue<AllocationID> allocationIds =
new ArrayBlockingQueue<>(parallelism);
+ resourceManagerGateway.setRequestSlotConsumer(
+ slotRequest ->
allocationIds.offer(slotRequest.getAllocationId()));
+
+ final SlotPoolGateway slotPoolGateway =
setupSlotPool(slotPool, resourceManagerGateway);
+
+ final Map<SlotRequestId,
CompletableFuture<LogicalSlot>> slotRequestFutures = new HashMap<>(parallelism);
+
+ for (int i = 0; i < parallelism; i++) {
+ final SlotRequestId slotRequestId = new
SlotRequestId();
+ slotRequestFutures.put(slotRequestId,
allocateSlot(slotPoolGateway, slotRequestId));
+ }
+
+ final List<SlotOffer> slotOffers = new
ArrayList<>(parallelism);
+
+ for (int i = 0; i < parallelism; i++) {
+ slotOffers.add(new
SlotOffer(allocationIds.take(), i, ResourceProfile.UNKNOWN));
+ }
+
+
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
+ slotPoolGateway.offerSlots(taskManagerLocation,
taskManagerGateway, slotOffers);
+
+ // wait for the completion of both slot futures
+
FutureUtils.waitForAll(slotRequestFutures.values()).get();
+
+ final ArrayBlockingQueue<AllocationID> freedSlots = new
ArrayBlockingQueue<>(1);
+ taskManagerGateway.setFreeSlotFunction(
+ (allocationID, throwable) -> {
+ freedSlots.offer(allocationID);
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ });
+
+ final FlinkException failException = new
FlinkException("Test fail exception");
+ // fail allocations one by one
+ for (int i = 0; i < parallelism - 1; i++) {
+ final SlotOffer slotOffer = slotOffers.get(i);
+ final
CompletableFuture<SerializableOptional<ResourceID>> emptyTaskExecutorFuture =
slotPoolGateway.failAllocation(
+ slotOffer.getAllocationId(),
+ failException);
+
+
assertThat(emptyTaskExecutorFuture.get().isPresent(), is(false));
+ assertThat(freedSlots.take(),
is(equalTo(slotOffer.getAllocationId())));
+ }
+
+ final SlotOffer slotOffer = slotOffers.get(parallelism
- 1);
+ final
CompletableFuture<SerializableOptional<ResourceID>> emptyTaskExecutorFuture =
slotPoolGateway.failAllocation(
+ slotOffer.getAllocationId(),
+ failException);
+ assertThat(emptyTaskExecutorFuture.get().get(),
is(equalTo(taskManagerLocation.getResourceID())));
+ assertThat(freedSlots.take(),
is(equalTo(slotOffer.getAllocationId())));
+
+ } finally {
+ RpcUtils.terminateRpcEndpoint(slotPool, timeout);
+ }
+ }
+
+ /**
+ * Tests that failing an allocation fails the pending slot request
+ */
+ @Test
+ public void testFailingAllocationFailsPendingSlotRequests() throws
Exception {
+ final SlotPool slotPool = new SlotPool(rpcService, jobId,
LocationPreferenceSchedulingStrategy.getInstance());
+
+ try {
+ final CompletableFuture<AllocationID>
allocationIdFuture = new CompletableFuture<>();
+
resourceManagerGateway.setRequestSlotConsumer(slotRequest ->
allocationIdFuture.complete(slotRequest.getAllocationId()));
+ final SlotPoolGateway slotPoolGateway =
setupSlotPool(slotPool, resourceManagerGateway);
+
+ final CompletableFuture<LogicalSlot> slotFuture =
allocateSlot(slotPoolGateway, new SlotRequestId());
+
+ final AllocationID allocationId =
allocationIdFuture.get();
+
+ assertThat(slotFuture.isDone(), is(false));
+
+ final FlinkException cause = new FlinkException("Fail
pending slot request failure.");
+ final
CompletableFuture<SerializableOptional<ResourceID>> responseFuture =
slotPoolGateway.failAllocation(allocationId, cause);
+
+ assertThat(responseFuture.get().isPresent(), is(false));
+
+ try {
+ slotFuture.get();
+ fail("Expected a slot allocation failure.");
+ } catch (ExecutionException ee) {
+
assertThat(ExceptionUtils.stripExecutionException(ee), equalTo(cause));
+ }
+ } finally {
+ RpcUtils.terminateRpcEndpoint(slotPool, timeout);
+ }
+ }
+
+ private CompletableFuture<LogicalSlot> allocateSlot(SlotPoolGateway
slotPoolGateway, SlotRequestId slotRequestId) {
+ return slotPoolGateway.allocateSlot(
+ slotRequestId,
+ new DummyScheduledUnit(),
+ SlotProfile.noRequirements(),
+ true,
+ timeout);
+ }
+
private static SlotPoolGateway setupSlotPool(
SlotPool slotPool,
ResourceManagerGateway resourceManagerGateway) throws
Exception {
@@ -750,13 +854,4 @@ private static SlotPoolGateway setupSlotPool(
return slotPool.getSelfGateway(SlotPoolGateway.class);
}
-
- private AllocatedSlot createSlot(final AllocationID allocationId) {
- return new AllocatedSlot(
- allocationId,
- taskManagerLocation,
- 0,
- ResourceProfile.UNKNOWN,
- taskManagerGateway);
- }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index a9e99495e34..912de36c881 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -59,13 +59,16 @@
private final Function<Tuple5<SlotID, JobID, AllocationID, String,
ResourceManagerId>, CompletableFuture<Acknowledge>> requestSlotFunction;
- TestingTaskExecutorGateway(String address, String hostname,
Consumer<ResourceID> heartbeatJobManagerConsumer, BiConsumer<JobID, Throwable>
disconnectJobManagerConsumer, BiFunction<TaskDeploymentDescriptor, JobMasterId,
CompletableFuture<Acknowledge>> submitTaskConsumer, Function<Tuple5<SlotID,
JobID, AllocationID, String, ResourceManagerId>,
CompletableFuture<Acknowledge>> requestSlotFunction) {
+ private final BiFunction<AllocationID, Throwable,
CompletableFuture<Acknowledge>> freeSlotFunction;
+
+ TestingTaskExecutorGateway(String address, String hostname,
Consumer<ResourceID> heartbeatJobManagerConsumer, BiConsumer<JobID, Throwable>
disconnectJobManagerConsumer, BiFunction<TaskDeploymentDescriptor, JobMasterId,
CompletableFuture<Acknowledge>> submitTaskConsumer, Function<Tuple5<SlotID,
JobID, AllocationID, String, ResourceManagerId>,
CompletableFuture<Acknowledge>> requestSlotFunction, BiFunction<AllocationID,
Throwable, CompletableFuture<Acknowledge>> freeSlotFunction) {
this.address = Preconditions.checkNotNull(address);
this.hostname = Preconditions.checkNotNull(hostname);
this.heartbeatJobManagerConsumer =
Preconditions.checkNotNull(heartbeatJobManagerConsumer);
this.disconnectJobManagerConsumer =
Preconditions.checkNotNull(disconnectJobManagerConsumer);
this.submitTaskConsumer =
Preconditions.checkNotNull(submitTaskConsumer);
this.requestSlotFunction =
Preconditions.checkNotNull(requestSlotFunction);
+ this.freeSlotFunction =
Preconditions.checkNotNull(freeSlotFunction);
}
@Override
@@ -141,7 +144,7 @@ public void disconnectResourceManager(Exception cause) {
@Override
public CompletableFuture<Acknowledge> freeSlot(AllocationID
allocationId, Throwable cause, Time timeout) {
- return CompletableFuture.completedFuture(Acknowledge.get());
+ return freeSlotFunction.apply(allocationId, cause);
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
index 1c2f1328a5a..e59eefd0c8d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
@@ -43,6 +43,7 @@
private static final BiConsumer<JobID, Throwable>
NOOP_DISCONNECT_JOBMANAGER_CONSUMER = (ignoredA, ignoredB) -> {};
private static final BiFunction<TaskDeploymentDescriptor, JobMasterId,
CompletableFuture<Acknowledge>> NOOP_SUBMIT_TASK_CONSUMER = (ignoredA,
ignoredB) -> CompletableFuture.completedFuture(Acknowledge.get());
private static final Function<Tuple5<SlotID, JobID, AllocationID,
String, ResourceManagerId>, CompletableFuture<Acknowledge>>
NOOP_REQUEST_SLOT_FUNCTION = ignored ->
CompletableFuture.completedFuture(Acknowledge.get());
+ private static final BiFunction<AllocationID, Throwable,
CompletableFuture<Acknowledge>> NOOP_FREE_SLOT_FUNCTION = (ignoredA, ignoredB)
-> CompletableFuture.completedFuture(Acknowledge.get());
private String address = "foobar:1234";
private String hostname = "foobar";
@@ -50,6 +51,7 @@
private BiConsumer<JobID, Throwable> disconnectJobManagerConsumer =
NOOP_DISCONNECT_JOBMANAGER_CONSUMER;
private BiFunction<TaskDeploymentDescriptor, JobMasterId,
CompletableFuture<Acknowledge>> submitTaskConsumer = NOOP_SUBMIT_TASK_CONSUMER;
private Function<Tuple5<SlotID, JobID, AllocationID, String,
ResourceManagerId>, CompletableFuture<Acknowledge>> requestSlotFunction =
NOOP_REQUEST_SLOT_FUNCTION;
+ private BiFunction<AllocationID, Throwable,
CompletableFuture<Acknowledge>> freeSlotFunction = NOOP_FREE_SLOT_FUNCTION;
public TestingTaskExecutorGatewayBuilder setAddress(String address) {
this.address = address;
@@ -81,7 +83,12 @@ public TestingTaskExecutorGatewayBuilder
setRequestSlotFunction(Function<Tuple5<
return this;
}
+ public TestingTaskExecutorGatewayBuilder
setFreeSlotFunction(BiFunction<AllocationID, Throwable,
CompletableFuture<Acknowledge>> freeSlotFunction) {
+ this.freeSlotFunction = freeSlotFunction;
+ return this;
+ }
+
public TestingTaskExecutorGateway createTestingTaskExecutorGateway() {
- return new TestingTaskExecutorGateway(address, hostname,
heartbeatJobManagerConsumer, disconnectJobManagerConsumer, submitTaskConsumer,
requestSlotFunction);
+ return new TestingTaskExecutorGateway(address, hostname,
heartbeatJobManagerConsumer, disconnectJobManagerConsumer, submitTaskConsumer,
requestSlotFunction, freeSlotFunction);
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Release TaskExecutors from SlotPool if all slots have been removed
> ------------------------------------------------------------------
>
> Key: FLINK-9912
> URL: https://issues.apache.org/jira/browse/FLINK-9912
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Coordination
> Affects Versions: 1.5.1, 1.6.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently, it is possible to fail slot allocations in the {{SlotPool}}.
> Failing an allocation means that the slot is removed from the {{SlotPool}}.
> If we have removed all slots from a {{TaskExecutor}}, then we should also
> release/close the connection to this {{TaskExecutor}}. At the moment, this
> only happens via the heartbeats if the {{TaskExecutor}} has become
> unreachable.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)