KarmaGYZ commented on code in PR #24325:
URL: https://github.com/apache/flink/pull/24325#discussion_r1494278123


##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java:
##########
@@ -216,6 +216,65 @@ void testAllocateSlotFailsWithException() {
                                 
assertThat(taskManagerInfo.getAllocatedSlots()).isEmpty());
     }
 
+    @Test
+    void testAllocationUpdatesIgnoredIfSlotRemoved() throws Exception {
+        final FineGrainedTaskManagerTracker taskManagerTracker =
+                new FineGrainedTaskManagerTracker();
+        final CompletableFuture<
+                        Tuple6<
+                                SlotID,
+                                JobID,
+                                AllocationID,
+                                ResourceProfile,
+                                String,
+                                ResourceManagerId>>
+                requestFuture = new CompletableFuture<>();
+        final CompletableFuture<Acknowledge> responseFuture = new 
CompletableFuture<>();
+        final TestingTaskExecutorGateway taskExecutorGateway =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setRequestSlotFunction(
+                                tuple6 -> {
+                                    requestFuture.complete(tuple6);
+                                    return responseFuture;
+                                })
+                        .createTestingTaskExecutorGateway();
+        final TaskExecutorConnection taskExecutorConnection =
+                new TaskExecutorConnection(ResourceID.generate(), 
taskExecutorGateway);
+        taskManagerTracker.addTaskManager(
+                taskExecutorConnection, ResourceProfile.ANY, 
ResourceProfile.ANY);
+        final ResourceTracker resourceTracker = new DefaultResourceTracker();
+        final JobID jobId = new JobID();
+        final SlotStatusSyncer slotStatusSyncer =
+                new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
+        slotStatusSyncer.initialize(
+                taskManagerTracker,
+                resourceTracker,
+                ResourceManagerId.generate(),
+                EXECUTOR_RESOURCE.getExecutor());
+
+        final CompletableFuture<Void> allocatedFuture =
+                slotStatusSyncer.allocateSlot(
+                        taskExecutorConnection.getInstanceID(),
+                        jobId,
+                        "address",
+                        ResourceProfile.ANY);
+        final AllocationID allocationId = requestFuture.get().f2;
+        assertThat(resourceTracker.getAcquiredResources(jobId))
+                .contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
+        assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId))
+                .hasValueSatisfying(
+                        slot -> {
+                            assertThat(slot.getJobId()).isEqualTo(jobId);
+                            
assertThat(slot.getState()).isEqualTo(SlotState.PENDING);
+                        });
+
+        
taskManagerTracker.removeTaskManager(taskExecutorConnection.getInstanceID());
+        
assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId)).isEmpty();
+
+        responseFuture.complete(Acknowledge.get());
+        assertThat(allocatedFuture).isNotCompletedExceptionally();

Review Comment:
   Thanks for the pointer! It's an awesome utility class. I think it is a valid 
issue and exists in other tests as well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to