XComp commented on code in PR #24325: URL: https://github.com/apache/flink/pull/24325#discussion_r1494349521
########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java: ########## @@ -216,6 +202,58 @@ void testAllocateSlotFailsWithException() { assertThat(taskManagerInfo.getAllocatedSlots()).isEmpty()); } + @Test + void testAllocationUpdatesIgnoredIfSlotRemoved() throws Exception { Review Comment: ```java private static void testSlotAllocation( QuadConsumer<SlotStatusSyncer, TaskManagerTracker, InstanceID, AllocationID> beforeCompletingSlotRequestCallback) throws ExecutionException, InterruptedException { final FineGrainedTaskManagerTracker taskManagerTracker = new FineGrainedTaskManagerTracker(); final CompletableFuture<AllocationID> requestFuture = new CompletableFuture<>(); final CompletableFuture<Acknowledge> responseFuture = new CompletableFuture<>(); final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() .setRequestSlotFunction( tuple6 -> { requestFuture.complete(tuple6.f2); return responseFuture; }) .createTestingTaskExecutorGateway(); final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway); taskManagerTracker.addTaskManager( taskExecutorConnection, ResourceProfile.ANY, ResourceProfile.ANY); final ResourceTracker resourceTracker = new DefaultResourceTracker(); final JobID jobId = new JobID(); final SlotStatusSyncer slotStatusSyncer = new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT); slotStatusSyncer.initialize( taskManagerTracker, resourceTracker, ResourceManagerId.generate(), EXECUTOR_RESOURCE.getExecutor()); final CompletableFuture<Void> allocatedFuture = slotStatusSyncer.allocateSlot( taskExecutorConnection.getInstanceID(), jobId, "address", ResourceProfile.ANY); final AllocationID allocationId = requestFuture.get(); assertThat(resourceTracker.getAcquiredResources(jobId)) .contains(ResourceRequirement.create(ResourceProfile.ANY, 1)); assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId)) .hasValueSatisfying( slot -> { assertThat(slot.getJobId()).isEqualTo(jobId); assertThat(slot.getState()).isEqualTo(SlotState.PENDING); }); beforeCompletingSlotRequestCallback.accept( slotStatusSyncer, taskManagerTracker, taskExecutorConnection.getInstanceID(), allocationId); responseFuture.complete(Acknowledge.get()); assertThatFuture(allocatedFuture).eventuallySucceeds(); } ``` Instead of generating more redundant code (which result in fixing certain issues in multiple locations), we could also move the test logic out of the three test methods into a dedicated method that is called by the three tests. The difference between the tests is really minor, i.e. only the action shortly before completing the slot request is different. Therefore, we could reduce the lines of code in this class quite a bit. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org