asfgit closed pull request #6360: [FLINK-9884] [runtime] fix slot request may 
not be removed when it has already be assigned in slot manager
URL: https://github.com/apache/flink/pull/6360
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index d74979ae683..f207a36394a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -592,6 +592,9 @@ private void updateSlotState(
                                                // set the allocation id such 
that the slot won't be considered for the pending slot request
                                                
slot.updateAllocation(allocationId, jobId);
 
+                                               // remove the pending request 
if any as it has been assigned
+                                               
pendingSlotRequests.remove(allocationId);
+
                                                // this will try to find a new 
slot for the request
                                                rejectPendingSlotRequest(
                                                        pendingSlotRequest,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index fb82aa5026a..c954c878139 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -43,10 +43,12 @@
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
+import akka.pattern.AskTimeoutException;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -1216,6 +1218,71 @@ public void testSlotRequestFailure() throws Exception {
                }
        }
 
+       /**
+        * Tests that pending request is removed if task executor reports a 
slot with its allocation id.
+        */
+       @Test
+       public void testSlotRequestRemovedIfTMReportAllocation() throws 
Exception {
+               try (final SlotManager slotManager = 
createSlotManager(ResourceManagerId.generate(),
+                               new 
TestingResourceActionsBuilder().createTestingResourceActions())) {
+
+                       final JobID jobID = new JobID();
+                       final SlotRequest slotRequest1 = new SlotRequest(jobID, 
new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
+                       slotManager.registerSlotRequest(slotRequest1);
+
+                       final BlockingQueue<Tuple5<SlotID, JobID, AllocationID, 
String, ResourceManagerId>> requestSlotQueue = new ArrayBlockingQueue<>(1);
+                       final BlockingQueue<CompletableFuture<Acknowledge>> 
responseQueue = new ArrayBlockingQueue<>(1);
+
+                       final TestingTaskExecutorGateway 
testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+                                       
.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> 
{
+                                               
requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple5);
+                                               try {
+                                                       return 
responseQueue.take();
+                                               } catch (InterruptedException 
ignored) {
+                                                       return 
FutureUtils.completedExceptionally(new FlinkException("Response queue was 
interrupted."));
+                                               }
+                                       })
+                                       .createTestingTaskExecutorGateway();
+
+                       final ResourceID taskExecutorResourceId = 
ResourceID.generate();
+                       final TaskExecutorConnection taskExecutionConnection = 
new TaskExecutorConnection(taskExecutorResourceId, testingTaskExecutorGateway);
+                       final SlotReport slotReport = new SlotReport(new 
SlotStatus(new SlotID(taskExecutorResourceId, 0), ResourceProfile.UNKNOWN));
+
+                       final CompletableFuture<Acknowledge> 
firstManualSlotRequestResponse = new CompletableFuture<>();
+                       responseQueue.offer(firstManualSlotRequestResponse);
+
+                       
slotManager.registerTaskManager(taskExecutionConnection, slotReport);
+
+                       final Tuple5<SlotID, JobID, AllocationID, String, 
ResourceManagerId> firstRequest = requestSlotQueue.take();
+
+                       final CompletableFuture<Acknowledge> 
secondManualSlotRequestResponse = new CompletableFuture<>();
+                       responseQueue.offer(secondManualSlotRequestResponse);
+
+                       final SlotRequest slotRequest2 = new SlotRequest(jobID, 
new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
+                       slotManager.registerSlotRequest(slotRequest2);
+
+                       // fail first request
+                       
firstManualSlotRequestResponse.completeExceptionally(new 
AskTimeoutException("Test exception to fail first allocation"));
+
+                       final Tuple5<SlotID, JobID, AllocationID, String, 
ResourceManagerId> secondRequest = requestSlotQueue.take();
+
+                       // fail second request
+                       
secondManualSlotRequestResponse.completeExceptionally(new 
SlotOccupiedException("Test exception", slotRequest1.getAllocationId(), jobID));
+
+                       assertThat(firstRequest.f2, 
equalTo(slotRequest1.getAllocationId()));
+                       assertThat(secondRequest.f2, 
equalTo(slotRequest2.getAllocationId()));
+                       assertThat(secondRequest.f0, equalTo(firstRequest.f0));
+
+                       
secondManualSlotRequestResponse.complete(Acknowledge.get());
+
+                       final TaskManagerSlot slot = 
slotManager.getSlot(secondRequest.f0);
+                       assertThat(slot.getState(), 
equalTo(TaskManagerSlot.State.ALLOCATED));
+                       assertThat(slot.getAllocationId(), 
equalTo(firstRequest.f2));
+
+                       assertEquals(1, 
slotManager.getNumberPendingSlotRequests());
+               }
+       }
+
        /**
         * Tests notify the job manager of the allocations when the task 
manager is failed/killed.
         */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to