KarmaGYZ commented on code in PR #24325: URL: https://github.com/apache/flink/pull/24325#discussion_r1494278123
########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java: ########## @@ -216,6 +216,65 @@ void testAllocateSlotFailsWithException() { assertThat(taskManagerInfo.getAllocatedSlots()).isEmpty()); } + @Test + void testAllocationUpdatesIgnoredIfSlotRemoved() throws Exception { + final FineGrainedTaskManagerTracker taskManagerTracker = + new FineGrainedTaskManagerTracker(); + final CompletableFuture< + Tuple6< + SlotID, + JobID, + AllocationID, + ResourceProfile, + String, + ResourceManagerId>> + requestFuture = new CompletableFuture<>(); + final CompletableFuture<Acknowledge> responseFuture = new CompletableFuture<>(); + final TestingTaskExecutorGateway taskExecutorGateway = + new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction( + tuple6 -> { + requestFuture.complete(tuple6); + return responseFuture; + }) + .createTestingTaskExecutorGateway(); + final TaskExecutorConnection taskExecutorConnection = + new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway); + taskManagerTracker.addTaskManager( + taskExecutorConnection, ResourceProfile.ANY, ResourceProfile.ANY); + final ResourceTracker resourceTracker = new DefaultResourceTracker(); + final JobID jobId = new JobID(); + final SlotStatusSyncer slotStatusSyncer = + new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT); + slotStatusSyncer.initialize( + taskManagerTracker, + resourceTracker, + ResourceManagerId.generate(), + EXECUTOR_RESOURCE.getExecutor()); + + final CompletableFuture<Void> allocatedFuture = + slotStatusSyncer.allocateSlot( + taskExecutorConnection.getInstanceID(), + jobId, + "address", + ResourceProfile.ANY); + final AllocationID allocationId = requestFuture.get().f2; + assertThat(resourceTracker.getAcquiredResources(jobId)) + .contains(ResourceRequirement.create(ResourceProfile.ANY, 1)); + assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId)) + .hasValueSatisfying( + slot -> { + assertThat(slot.getJobId()).isEqualTo(jobId); + assertThat(slot.getState()).isEqualTo(SlotState.PENDING); + }); + + taskManagerTracker.removeTaskManager(taskExecutorConnection.getInstanceID()); + assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId)).isEmpty(); + + responseFuture.complete(Acknowledge.get()); + assertThat(allocatedFuture).isNotCompletedExceptionally(); Review Comment: Thanks for the pointer! It's an awesome utility class. I think it is a valid issue and exists in other tests as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org