azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441359953
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization()
throws Exception {
}
}
+ @Test
+ public void testOrphanedAllocationCanBeRemapped() throws Exception {
+ try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+ final List<AllocationID> allocationIds = new
ArrayList<>();
+ resourceManagerGateway.setRequestSlotConsumer(
+ slotRequest ->
allocationIds.add(slotRequest.getAllocationId()));
+
+ final List<AllocationID> canceledAllocations = new
ArrayList<>();
+
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
+
+ setupSlotPool(slotPool, resourceManagerGateway,
mainThreadExecutor);
+ final Scheduler scheduler = setupScheduler(slotPool,
mainThreadExecutor);
Review comment:
why do we use `Scheduler` to unit test `SlotPoolImpl`?
why not to call `SlotPoolImpl` directly, like in
`testFailingAllocationFailsRemappedPendingSlotRequests`?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization()
throws Exception {
}
}
+ @Test
+ public void testOrphanedAllocationCanBeRemapped() throws Exception {
+ try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+ final List<AllocationID> allocationIds = new
ArrayList<>();
+ resourceManagerGateway.setRequestSlotConsumer(
+ slotRequest ->
allocationIds.add(slotRequest.getAllocationId()));
+
+ final List<AllocationID> canceledAllocations = new
ArrayList<>();
+
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
+
+ setupSlotPool(slotPool, resourceManagerGateway,
mainThreadExecutor);
+ final Scheduler scheduler = setupScheduler(slotPool,
mainThreadExecutor);
+
+ final SlotRequestId slotRequestId1 = new
SlotRequestId();
+ scheduler.allocateSlot(
+ slotRequestId1,
+ new DummyScheduledUnit(),
+ SlotProfile.noRequirements(),
+ timeout);
Review comment:
```suggestion
allocateSlot(scheduler, slotRequestId1);
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization()
throws Exception {
}
}
+ @Test
+ public void testOrphanedAllocationCanBeRemapped() throws Exception {
+ try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+ final List<AllocationID> allocationIds = new
ArrayList<>();
+ resourceManagerGateway.setRequestSlotConsumer(
+ slotRequest ->
allocationIds.add(slotRequest.getAllocationId()));
+
+ final List<AllocationID> canceledAllocations = new
ArrayList<>();
+
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
Review comment:
nit: maybe, not now but if it can be reused also in other tests, it
would be nice to have something like an RM harness:
```
class RmHarness {
final List<AllocationID> allocationIds = new ArrayList<>();
final List<AllocationID> canceledAllocations = new ArrayList<>();
RmHarness(resourceManagerGateway)
getAllocations
getCanceled
}
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##########
@@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests()
throws Exception {
}
}
+ @Test
+ public void testFailingAllocationFailsRemappedPendingSlotRequests()
throws Exception {
+ final List<AllocationID> allocations = new ArrayList<>();
+ resourceManagerGateway.setRequestSlotConsumer(slotRequest ->
allocations.add(slotRequest.getAllocationId()));
+
+ try (SlotPoolImpl slotPool = setUpSlotPool()) {
Review comment:
Could we reuse/deduplicate `setUpSlotPool` also in `SlotPoolImplTest`?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization()
throws Exception {
}
}
+ @Test
+ public void testOrphanedAllocationCanBeRemapped() throws Exception {
+ try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+ final List<AllocationID> allocationIds = new
ArrayList<>();
+ resourceManagerGateway.setRequestSlotConsumer(
+ slotRequest ->
allocationIds.add(slotRequest.getAllocationId()));
+
+ final List<AllocationID> canceledAllocations = new
ArrayList<>();
+
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
+
+ setupSlotPool(slotPool, resourceManagerGateway,
mainThreadExecutor);
+ final Scheduler scheduler = setupScheduler(slotPool,
mainThreadExecutor);
+
+ final SlotRequestId slotRequestId1 = new
SlotRequestId();
+ scheduler.allocateSlot(
+ slotRequestId1,
+ new DummyScheduledUnit(),
+ SlotProfile.noRequirements(),
+ timeout);
Review comment:
There is already a private method `SlotPoolImplTest#allocateSlot` for
this.
I would also extend this private method to submit multiple allocations at
once in order:
`allocateSlot(Scheduler scheduler, SlotRequestId ... slotRequestIds)`
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##########
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization()
throws Exception {
}
}
+ @Test
+ public void testOrphanedAllocationCanBeRemapped() throws Exception {
+ try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+ final List<AllocationID> allocationIds = new
ArrayList<>();
+ resourceManagerGateway.setRequestSlotConsumer(
+ slotRequest ->
allocationIds.add(slotRequest.getAllocationId()));
+
+ final List<AllocationID> canceledAllocations = new
ArrayList<>();
+
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
Review comment:
it can be partially reused in
`testFailingAllocationFailsRemappedPendingSlotRequests`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]