[
https://issues.apache.org/jira/browse/FLINK-8089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290556#comment-16290556
]
ASF GitHub Bot commented on FLINK-8089:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5090#discussion_r156882139
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
---
@@ -383,6 +386,76 @@ public void
testSlotRequestCancellationUponFailingRequest() throws Exception {
}
}
+ /**
+ * Tests that unused offered slots are directly used to fulfil pending
slot
+ * requests.
+ *
+ * <p>See FLINK-8089
+ */
+ @Test
+ public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws
Exception {
+ final SlotPool slotPool = new SlotPool(rpcService, jobId);
+
+ final JobMasterId jobMasterId = JobMasterId.generate();
+ final String jobMasterAddress = "foobar";
+ final CompletableFuture<AllocationID> allocationIdFuture = new
CompletableFuture<>();
+ final TestingResourceManagerGateway resourceManagerGateway =
new TestingResourceManagerGateway();
+
+ resourceManagerGateway.setRequestSlotConsumer(
+ (SlotRequest slotRequest) ->
allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+ final SlotRequestID slotRequestId1 = new SlotRequestID();
+ final SlotRequestID slotRequestId2 = new SlotRequestID();
+
+ try {
+ slotPool.start(jobMasterId, jobMasterAddress);
+
+ final SlotPoolGateway slotPoolGateway =
slotPool.getSelfGateway(SlotPoolGateway.class);
+
+ final ScheduledUnit scheduledUnit = new
ScheduledUnit(mock(Execution.class));
+
+
slotPoolGateway.connectToResourceManager(resourceManagerGateway);
+
+ CompletableFuture<LogicalSlot> slotFuture1 =
slotPoolGateway.allocateSlot(
+ slotRequestId1,
+ scheduledUnit,
+ ResourceProfile.UNKNOWN,
+ Collections.emptyList(),
+ timeout);
+
+ // wait for the first slot request
+ final AllocationID allocationId =
allocationIdFuture.get();
+
+ CompletableFuture<LogicalSlot> slotFuture2 =
slotPoolGateway.allocateSlot(
+ slotRequestId2,
+ scheduledUnit,
+ ResourceProfile.UNKNOWN,
+ Collections.emptyList(),
+ timeout);
+
+ slotPoolGateway.cancelSlotRequest(slotRequestId1);
+
+ try {
+ // this should fail with a CancellationException
+ slotFuture1.get();
+ fail("The first slot future should have failed
because it was cancelled.");
+ } catch (ExecutionException ee) {
+
assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof
CancellationException);
+ }
+
+ final SlotOffer slotOffer = new SlotOffer(allocationId,
0, ResourceProfile.UNKNOWN);
+
+
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+
assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway,
slotOffer).get());
+
+ // the slot offer should fulfil the second slot request
--- End diff --
same here.
> Fulfil slot requests with unused offered slots
> ----------------------------------------------
>
> Key: FLINK-8089
> URL: https://issues.apache.org/jira/browse/FLINK-8089
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Coordination
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Labels: flip-6
>
> The {{SlotPool}} adds unused offered slots to the list of available slots
> without checking whether another pending slot request could be fulfilled with
> this slot. This should be changed.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)