tillrohrmann commented on a change in pull request #9339:
[FLINK-13555][runtime] SlotPool fails batch slot requests immediately if they
are unfulfillable.
URL: https://github.com/apache/flink/pull/9339#discussion_r310552583
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
##########
@@ -147,14 +148,42 @@ public void
testPendingBatchSlotRequestDoesNotFailIfAllocationFails() throws Exc
final CompletableFuture<PhysicalSlot> slotFuture =
SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor,
resourceProfile);
- SlotPoolUtils.failAllocation(slotPool,
directMainThreadExecutor, allocationIdFuture.get());
+ SlotPoolUtils.failAllocation(slotPool,
directMainThreadExecutor, allocationIdFuture.get(), new FlinkException("Failed
request"));
assertThat(slotFuture.isDone(), is(false));
}
}
/**
- * Tests that a batch slot request won't fail if its resource manager
request fails.
+ * Tests that a batch slot request does react to {@link
SlotPool#failAllocation(AllocationID, Exception)}
+ * signals whose exception is {@link UnfulfillableSlotRequestException}.
+ */
+ @Test
+ public void
testPendingBatchSlotRequestFailsIfAllocationFailsUnfulfillable() throws
Exception {
+ final TestingResourceManagerGateway
testingResourceManagerGateway = new TestingResourceManagerGateway();
+ final CompletableFuture<AllocationID> allocationIdFuture = new
CompletableFuture<>();
+
testingResourceManagerGateway.setRequestSlotConsumer(slotRequest ->
allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+ final ComponentMainThreadExecutor directMainThreadExecutor =
ComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+ final Time batchSlotTimeout = Time.milliseconds(1000L);
+ try (final SlotPoolImpl slotPool = new
SlotPoolBuilder(directMainThreadExecutor)
+ .setBatchSlotTimeout(batchSlotTimeout)
+
.setResourceManagerGateway(testingResourceManagerGateway)
+ .build()) {
+
+ final CompletableFuture<PhysicalSlot> slotFuture =
SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor,
resourceProfile);
+
+ SlotPoolUtils.failAllocation(slotPool,
directMainThreadExecutor, allocationIdFuture.get(),
+ new UnfulfillableSlotRequestException());
+
+ assertThat(slotFuture.isDone(), is(true));
Review comment:
I guess we should check that the `slotFuture` has been completed
exceptionally.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services