asfgit closed pull request #6360: [FLINK-9884] [runtime] fix slot request may
not be removed when it has already be assigned in slot manager
URL: https://github.com/apache/flink/pull/6360
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index d74979ae683..f207a36394a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -592,6 +592,9 @@ private void updateSlotState(
// set the allocation id such
that the slot won't be considered for the pending slot request
slot.updateAllocation(allocationId, jobId);
+ // remove the pending request
if any as it has been assigned
+
pendingSlotRequests.remove(allocationId);
+
// this will try to find a new
slot for the request
rejectPendingSlotRequest(
pendingSlotRequest,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index fb82aa5026a..c954c878139 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -43,10 +43,12 @@
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
+import akka.pattern.AskTimeoutException;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -1216,6 +1218,71 @@ public void testSlotRequestFailure() throws Exception {
}
}
+ /**
+ * Tests that pending request is removed if task executor reports a
slot with its allocation id.
+ */
+ @Test
+ public void testSlotRequestRemovedIfTMReportAllocation() throws
Exception {
+ try (final SlotManager slotManager =
createSlotManager(ResourceManagerId.generate(),
+ new
TestingResourceActionsBuilder().createTestingResourceActions())) {
+
+ final JobID jobID = new JobID();
+ final SlotRequest slotRequest1 = new SlotRequest(jobID,
new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
+ slotManager.registerSlotRequest(slotRequest1);
+
+ final BlockingQueue<Tuple5<SlotID, JobID, AllocationID,
String, ResourceManagerId>> requestSlotQueue = new ArrayBlockingQueue<>(1);
+ final BlockingQueue<CompletableFuture<Acknowledge>>
responseQueue = new ArrayBlockingQueue<>(1);
+
+ final TestingTaskExecutorGateway
testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+
.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 ->
{
+
requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple5);
+ try {
+ return
responseQueue.take();
+ } catch (InterruptedException
ignored) {
+ return
FutureUtils.completedExceptionally(new FlinkException("Response queue was
interrupted."));
+ }
+ })
+ .createTestingTaskExecutorGateway();
+
+ final ResourceID taskExecutorResourceId =
ResourceID.generate();
+ final TaskExecutorConnection taskExecutionConnection =
new TaskExecutorConnection(taskExecutorResourceId, testingTaskExecutorGateway);
+ final SlotReport slotReport = new SlotReport(new
SlotStatus(new SlotID(taskExecutorResourceId, 0), ResourceProfile.UNKNOWN));
+
+ final CompletableFuture<Acknowledge>
firstManualSlotRequestResponse = new CompletableFuture<>();
+ responseQueue.offer(firstManualSlotRequestResponse);
+
+
slotManager.registerTaskManager(taskExecutionConnection, slotReport);
+
+ final Tuple5<SlotID, JobID, AllocationID, String,
ResourceManagerId> firstRequest = requestSlotQueue.take();
+
+ final CompletableFuture<Acknowledge>
secondManualSlotRequestResponse = new CompletableFuture<>();
+ responseQueue.offer(secondManualSlotRequestResponse);
+
+ final SlotRequest slotRequest2 = new SlotRequest(jobID,
new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
+ slotManager.registerSlotRequest(slotRequest2);
+
+ // fail first request
+
firstManualSlotRequestResponse.completeExceptionally(new
AskTimeoutException("Test exception to fail first allocation"));
+
+ final Tuple5<SlotID, JobID, AllocationID, String,
ResourceManagerId> secondRequest = requestSlotQueue.take();
+
+ // fail second request
+
secondManualSlotRequestResponse.completeExceptionally(new
SlotOccupiedException("Test exception", slotRequest1.getAllocationId(), jobID));
+
+ assertThat(firstRequest.f2,
equalTo(slotRequest1.getAllocationId()));
+ assertThat(secondRequest.f2,
equalTo(slotRequest2.getAllocationId()));
+ assertThat(secondRequest.f0, equalTo(firstRequest.f0));
+
+
secondManualSlotRequestResponse.complete(Acknowledge.get());
+
+ final TaskManagerSlot slot =
slotManager.getSlot(secondRequest.f0);
+ assertThat(slot.getState(),
equalTo(TaskManagerSlot.State.ALLOCATED));
+ assertThat(slot.getAllocationId(),
equalTo(firstRequest.f2));
+
+ assertEquals(1,
slotManager.getNumberPendingSlotRequests());
+ }
+ }
+
/**
* Tests notify the job manager of the allocations when the task
manager is failed/killed.
*/
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services