tillrohrmann commented on a change in pull request #15612:
URL: https://github.com/apache/flink/pull/15612#discussion_r613382581



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
##########
@@ -1228,6 +1228,72 @@ public void testSlotOfferCounterIsSeparatedByJob() 
throws Exception {
         }
     }
 
+    /**
+     * Tests that freeing an inactive slot is a legal operation that does not 
throw an exception.
+     */
+    @Test
+    public void testFreeingInactiveSlotDoesNotFail() throws Exception {
+        final OneShotLatch taskExecutorIsRegistered = new OneShotLatch();
+        final TestingResourceManagerGateway resourceManagerGateway =
+                createRmWithTmRegisterAndNotifySlotHooks(
+                        new InstanceID(), taskExecutorIsRegistered, new 
CompletableFuture<>());
+
+        rpc.registerGateway(resourceManagerGateway.getAddress(), 
resourceManagerGateway);
+
+        final MultiShotLatch offerSlotsLatch = new MultiShotLatch();
+        final TestingJobMasterGateway jobMasterGateway =
+                new TestingJobMasterGatewayBuilder()
+                        .setOfferSlotsFunction(
+                                (resourceID, slotOffers) -> {
+                                    offerSlotsLatch.trigger();
+                                    return new CompletableFuture<>();
+                                })
+                        .build();
+
+        rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
+
+        final TaskSlotTable<Task> taskSlotTable = 
TaskSlotUtils.createTaskSlotTable(1);
+        final TaskExecutorLocalStateStoresManager localStateStoresManager =
+                createTaskExecutorLocalStateStoresManager();
+        final TaskManagerServices taskManagerServices =
+                new TaskManagerServicesBuilder()
+                        
.setUnresolvedTaskManagerLocation(unresolvedTaskManagerLocation)
+                        .setTaskSlotTable(taskSlotTable)
+                        .setTaskStateManager(localStateStoresManager)
+                        .build();
+
+        final TestingTaskExecutor taskExecutor = 
createTestingTaskExecutor(taskManagerServices);
+
+        try {
+            taskExecutor.start();
+            taskExecutor.waitUntilStarted();
+
+            final TaskExecutorGateway tmGateway =
+                    taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+
+            taskExecutorIsRegistered.await();
+
+            jobManagerLeaderRetriever.notifyListener(
+                    jobMasterGateway.getAddress(), 
jobMasterGateway.getFencingToken().toUUID());
+
+            final AllocationID allocationId = new AllocationID();
+
+            requestSlot(
+                    tmGateway,
+                    allocationId,
+                    0,
+                    jobId,
+                    resourceManagerGateway.getFencingToken(),
+                    jobMasterGateway.getAddress());
+
+            offerSlotsLatch.await();
+
+            tmGateway.freeSlot(allocationId, new RuntimeException("test 
exception"), timeout).get();

Review comment:
       Should we also assert that the slot has actually been freed and returned 
to the `ResourceManager`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
##########
@@ -983,6 +991,309 @@ public void testSlotAcceptance() throws Exception {
         }
     }
 
+    private enum ResponseOrder {
+        ACCEPT_THEN_REJECT,
+        REJECT_THEN_ACCEPT
+    }
+
+    /**
+     * Tests that the task executor does not release a slot that was rejected 
by the job master, if
+     * another slot offer is currently in progress.
+     */
+    @Test
+    public void testRejectedSlotNotFreedIfAnotherOfferIsPending() throws 
Exception {
+        
testSlotOfferResponseWithPendingSlotOffer(ResponseOrder.REJECT_THEN_ACCEPT);
+    }
+
+    /**
+     * Tests that the task executor does not activate a slot that was accepted 
by the job master, if
+     * another slot offer is currently in progress.
+     */
+    @Test
+    public void testAcceptedSlotNotActivatedIfAnotherOfferIsPending() throws 
Exception {
+        
testSlotOfferResponseWithPendingSlotOffer(ResponseOrder.ACCEPT_THEN_REJECT);
+    }
+
+    /**
+     * Tests the behavior of the task executor when a slot offer response is 
received while a newer
+     * slot offer is in progress.
+     */
+    private void testSlotOfferResponseWithPendingSlotOffer(final ResponseOrder 
responseOrder)
+            throws Exception {
+        final OneShotLatch taskExecutorIsRegistered = new OneShotLatch();
+        final TestingResourceManagerGateway resourceManagerGateway =
+                createRmWithTmRegisterAndNotifySlotHooks(
+                        new InstanceID(), taskExecutorIsRegistered, new 
CompletableFuture<>());
+
+        final CompletableFuture<Collection<SlotOffer>> 
firstOfferResponseFuture =
+                new CompletableFuture<>();
+        final CompletableFuture<Collection<SlotOffer>> 
secondOfferResponseFuture =
+                new CompletableFuture<>();
+
+        final Queue<CompletableFuture<Collection<SlotOffer>>> 
slotOfferResponses =
+                new ArrayDeque<>(
+                        Arrays.asList(firstOfferResponseFuture, 
secondOfferResponseFuture));
+
+        final MultiShotLatch offerSlotsLatch = new MultiShotLatch();
+        final TestingJobMasterGateway jobMasterGateway =
+                new TestingJobMasterGatewayBuilder()
+                        .setOfferSlotsFunction(
+                                (resourceID, slotOffers) -> {
+                                    offerSlotsLatch.trigger();
+                                    return slotOfferResponses.remove();
+                                })
+                        .build();
+
+        rpc.registerGateway(resourceManagerGateway.getAddress(), 
resourceManagerGateway);
+        rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
+
+        final TaskSlotTable<Task> taskSlotTable = 
TaskSlotUtils.createTaskSlotTable(2);
+        final TaskManagerServices taskManagerServices =
+                createTaskManagerServicesWithTaskSlotTable(taskSlotTable);
+        final TestingTaskExecutor taskExecutor = 
createTestingTaskExecutor(taskManagerServices);
+
+        final ThreadSafeTaskSlotTable<Task> threadSafeTaskSlotTable =
+                new ThreadSafeTaskSlotTable<>(
+                        taskSlotTable, 
taskExecutor.getMainThreadExecutableForTesting());
+
+        final SlotOffer slotOffer1 = new SlotOffer(new AllocationID(), 0, 
ResourceProfile.ANY);
+        final SlotOffer slotOffer2 = new SlotOffer(new AllocationID(), 1, 
ResourceProfile.ANY);
+
+        try {
+            taskExecutor.start();
+            taskExecutor.waitUntilStarted();
+
+            final TaskExecutorGateway tmGateway =
+                    taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+
+            // wait until task executor registered at the RM
+            taskExecutorIsRegistered.await();
+
+            // notify job leader to start slot offering
+            jobManagerLeaderRetriever.notifyListener(
+                    jobMasterGateway.getAddress(), 
jobMasterGateway.getFencingToken().toUUID());
+
+            // request the first slot
+            requestSlot(
+                    tmGateway,
+                    slotOffer1.getAllocationId(),
+                    slotOffer1.getSlotIndex(),
+                    resourceManagerGateway.getFencingToken(),
+                    jobMasterGateway.getAddress());
+
+            // wait until first slot offer as arrived
+            offerSlotsLatch.await();
+
+            // request second slot, triggering another offer containing both 
slots
+            requestSlot(
+                    tmGateway,
+                    slotOffer2.getAllocationId(),
+                    slotOffer2.getSlotIndex(),
+                    resourceManagerGateway.getFencingToken(),
+                    jobMasterGateway.getAddress());
+
+            // wait until second slot offer as arrived
+            offerSlotsLatch.await();
+
+            switch (responseOrder) {
+                case ACCEPT_THEN_REJECT:
+                    // accept the first offer, but reject both slots for the 
second offer
+                    
firstOfferResponseFuture.complete(Collections.singletonList(slotOffer1));
+                    assertThat(
+                            
threadSafeTaskSlotTable.getActiveTaskSlotAllocationIdsPerJob(jobId),
+                            empty());
+                    
secondOfferResponseFuture.complete(Collections.emptyList());
+                    
assertThat(threadSafeTaskSlotTable.getAllocationIdsPerJob(jobId), empty());
+                    return;
+                case REJECT_THEN_ACCEPT:
+                    // fail the first offer, but accept both slots for the 
second offer
+                    // in the past the rejection of the first offer freed the 
slot; when the slot is
+                    // accepted from the second offer the activation of said 
slot then failed
+                    firstOfferResponseFuture.complete(Collections.emptyList());
+                    
secondOfferResponseFuture.complete(Arrays.asList(slotOffer1, slotOffer2));
+                    assertThat(
+                            
threadSafeTaskSlotTable.getAllocationIdsPerJob(jobId),
+                            containsInAnyOrder(
+                                    slotOffer1.getAllocationId(), 
slotOffer2.getAllocationId()));
+                    return;
+            }
+        } finally {
+            RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+        }
+    }
+
+    @Test
+    public void testSlotOfferCounterIsSeparatedByJob() throws Exception {
+        final OneShotLatch taskExecutorIsRegistered = new OneShotLatch();
+        final TestingResourceManagerGateway resourceManagerGateway =
+                createRmWithTmRegisterAndNotifySlotHooks(
+                        new InstanceID(), taskExecutorIsRegistered, new 
CompletableFuture<>());
+
+        final CompletableFuture<Collection<SlotOffer>> 
firstOfferResponseFuture =
+                new CompletableFuture<>();
+        final CompletableFuture<Collection<SlotOffer>> 
secondOfferResponseFuture =
+                new CompletableFuture<>();
+
+        final Queue<CompletableFuture<Collection<SlotOffer>>> 
slotOfferResponses =
+                new ArrayDeque<>(
+                        Arrays.asList(firstOfferResponseFuture, 
secondOfferResponseFuture));
+
+        final MultiShotLatch offerSlotsLatch = new MultiShotLatch();
+        final TestingJobMasterGateway jobMasterGateway1 =
+                new TestingJobMasterGatewayBuilder()
+                        .setAddress("jm1")
+                        .setOfferSlotsFunction(
+                                (resourceID, slotOffers) -> {
+                                    offerSlotsLatch.trigger();
+                                    return slotOfferResponses.remove();
+                                })
+                        .build();
+        final TestingJobMasterGateway jobMasterGateway2 =
+                new TestingJobMasterGatewayBuilder()
+                        .setAddress("jm2")
+                        .setOfferSlotsFunction(
+                                (resourceID, slotOffers) -> {
+                                    offerSlotsLatch.trigger();
+                                    return slotOfferResponses.remove();
+                                })
+                        .build();
+
+        rpc.registerGateway(resourceManagerGateway.getAddress(), 
resourceManagerGateway);
+        rpc.registerGateway(jobMasterGateway1.getAddress(), jobMasterGateway1);
+        rpc.registerGateway(jobMasterGateway2.getAddress(), jobMasterGateway2);
+
+        final TaskSlotTable<Task> taskSlotTable = 
TaskSlotUtils.createTaskSlotTable(2);
+        final TaskManagerServices taskManagerServices =
+                createTaskManagerServicesWithTaskSlotTable(taskSlotTable);
+        final TestingTaskExecutor taskExecutor = 
createTestingTaskExecutor(taskManagerServices);
+
+        final ThreadSafeTaskSlotTable<Task> threadSafeTaskSlotTable =
+                new ThreadSafeTaskSlotTable<>(
+                        taskSlotTable, 
taskExecutor.getMainThreadExecutableForTesting());
+
+        final SlotOffer slotOffer1 = new SlotOffer(new AllocationID(), 0, 
ResourceProfile.ANY);
+        final SlotOffer slotOffer2 = new SlotOffer(new AllocationID(), 1, 
ResourceProfile.ANY);
+
+        try {
+            taskExecutor.start();
+            taskExecutor.waitUntilStarted();
+
+            final TaskExecutorGateway tmGateway =
+                    taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+
+            // wait until task executor registered at the RM
+            taskExecutorIsRegistered.await();
+
+            // notify job leader to start slot offering
+            jobManagerLeaderRetriever.notifyListener(
+                    jobMasterGateway1.getAddress(), 
jobMasterGateway1.getFencingToken().toUUID());
+            jobManagerLeaderRetriever2.notifyListener(
+                    jobMasterGateway2.getAddress(), 
jobMasterGateway2.getFencingToken().toUUID());
+
+            // request the first slot
+            requestSlot(
+                    tmGateway,
+                    slotOffer1.getAllocationId(),
+                    slotOffer1.getSlotIndex(),
+                    jobId,
+                    resourceManagerGateway.getFencingToken(),
+                    jobMasterGateway1.getAddress());
+
+            // wait until first slot offer as arrived
+            offerSlotsLatch.await();
+
+            // request second slot, triggering another offer containing both 
slots
+            requestSlot(
+                    tmGateway,
+                    slotOffer2.getAllocationId(),
+                    slotOffer2.getSlotIndex(),
+                    jobId2,
+                    resourceManagerGateway.getFencingToken(),
+                    jobMasterGateway2.getAddress());
+
+            // wait until second slot offer as arrived
+            offerSlotsLatch.await();
+
+            
firstOfferResponseFuture.complete(Collections.singletonList(slotOffer1));
+            
firstOfferResponseFuture.complete(Collections.singletonList(slotOffer2));

Review comment:
       ```suggestion
               
secondOfferResponseFuture.complete(Collections.singletonList(slotOffer2));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to