azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441359953



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() 
throws Exception {
                }
        }
 
+       @Test
+       public void testOrphanedAllocationCanBeRemapped() throws Exception {
+               try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+                       final List<AllocationID> allocationIds = new 
ArrayList<>();
+                       resourceManagerGateway.setRequestSlotConsumer(
+                               slotRequest -> 
allocationIds.add(slotRequest.getAllocationId()));
+
+                       final List<AllocationID> canceledAllocations = new 
ArrayList<>();
+                       
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
+
+                       setupSlotPool(slotPool, resourceManagerGateway, 
mainThreadExecutor);
+                       final Scheduler scheduler = setupScheduler(slotPool, 
mainThreadExecutor);

Review comment:
       why do we use `Scheduler` to unit test `SlotPoolImpl`?
   why not to call `SlotPoolImpl` directly, like in 
`testFailingAllocationFailsRemappedPendingSlotRequests`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() 
throws Exception {
                }
        }
 
+       @Test
+       public void testOrphanedAllocationCanBeRemapped() throws Exception {
+               try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+                       final List<AllocationID> allocationIds = new 
ArrayList<>();
+                       resourceManagerGateway.setRequestSlotConsumer(
+                               slotRequest -> 
allocationIds.add(slotRequest.getAllocationId()));
+
+                       final List<AllocationID> canceledAllocations = new 
ArrayList<>();
+                       
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
+
+                       setupSlotPool(slotPool, resourceManagerGateway, 
mainThreadExecutor);
+                       final Scheduler scheduler = setupScheduler(slotPool, 
mainThreadExecutor);
+
+                       final SlotRequestId slotRequestId1 = new 
SlotRequestId();
+                       scheduler.allocateSlot(
+                               slotRequestId1,
+                               new DummyScheduledUnit(),
+                               SlotProfile.noRequirements(),
+                               timeout);

Review comment:
       ```suggestion
                        allocateSlot(scheduler, slotRequestId1);
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() 
throws Exception {
                }
        }
 
+       @Test
+       public void testOrphanedAllocationCanBeRemapped() throws Exception {
+               try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+                       final List<AllocationID> allocationIds = new 
ArrayList<>();
+                       resourceManagerGateway.setRequestSlotConsumer(
+                               slotRequest -> 
allocationIds.add(slotRequest.getAllocationId()));
+
+                       final List<AllocationID> canceledAllocations = new 
ArrayList<>();
+                       
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);

Review comment:
       nit: maybe, not now but if it can be reused also in other tests, it 
would be nice to have something like an RM harness:
   ```
   class RmHarness {
   final List<AllocationID> allocationIds = new ArrayList<>();
   final List<AllocationID> canceledAllocations = new ArrayList<>();
   RmHarness(resourceManagerGateway)
   getAllocations
   getCanceled
   }
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##########
@@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() 
throws Exception {
                }
        }
 
+       @Test
+       public void testFailingAllocationFailsRemappedPendingSlotRequests() 
throws Exception {
+               final List<AllocationID> allocations = new ArrayList<>();
+               resourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
allocations.add(slotRequest.getAllocationId()));
+
+               try (SlotPoolImpl slotPool = setUpSlotPool()) {

Review comment:
       Could we reuse/deduplicate `setUpSlotPool` also in `SlotPoolImplTest`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() 
throws Exception {
                }
        }
 
+       @Test
+       public void testOrphanedAllocationCanBeRemapped() throws Exception {
+               try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+                       final List<AllocationID> allocationIds = new 
ArrayList<>();
+                       resourceManagerGateway.setRequestSlotConsumer(
+                               slotRequest -> 
allocationIds.add(slotRequest.getAllocationId()));
+
+                       final List<AllocationID> canceledAllocations = new 
ArrayList<>();
+                       
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
+
+                       setupSlotPool(slotPool, resourceManagerGateway, 
mainThreadExecutor);
+                       final Scheduler scheduler = setupScheduler(slotPool, 
mainThreadExecutor);
+
+                       final SlotRequestId slotRequestId1 = new 
SlotRequestId();
+                       scheduler.allocateSlot(
+                               slotRequestId1,
+                               new DummyScheduledUnit(),
+                               SlotProfile.noRequirements(),
+                               timeout);

Review comment:
       There is already a private method `SlotPoolImplTest#allocateSlot` for 
this.
   I would also extend this private method to submit multiple allocations at 
once in order:
   `allocateSlot(Scheduler scheduler, SlotRequestId ... slotRequestIds)`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() 
throws Exception {
                }
        }
 
+       @Test
+       public void testOrphanedAllocationCanBeRemapped() throws Exception {
+               try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+                       final List<AllocationID> allocationIds = new 
ArrayList<>();
+                       resourceManagerGateway.setRequestSlotConsumer(
+                               slotRequest -> 
allocationIds.add(slotRequest.getAllocationId()));
+
+                       final List<AllocationID> canceledAllocations = new 
ArrayList<>();
+                       
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);

Review comment:
       it can be partially reused in 
`testFailingAllocationFailsRemappedPendingSlotRequests`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to