tillrohrmann commented on a change in pull request #15612:
URL: https://github.com/apache/flink/pull/15612#discussion_r613191026
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -1488,8 +1493,34 @@ private void
internalOfferSlotsToJobManager(JobTable.Connection jobManagerConnec
JobID jobId,
JobMasterGateway jobMasterGateway,
JobMasterId jobMasterId,
- Collection<SlotOffer> offeredSlots) {
+ Collection<SlotOffer> offeredSlots,
+ int offerId) {
return (Iterable<SlotOffer> acceptedSlots, Throwable throwable) -> {
+ // check if this is the latest offer
+ if (offerId != offerCounter) {
+ // If this offer is outdated then it can be safely ignored.
+ // If the response for a given slot is identical in both
offers (accepted/rejected),
+ // then this is naturally the case since the end-result is the
same.
+ // If the responses differ, then there are 2 cases to consider:
+ // 1) initially rejected, later accepted
+ // This can happen when the resource requirements of a job
increases between
+ // offers.
+ // In this case the first response MUST be ignored, so that
+ // the the slot can be properly activated when the second
response arrives.
+ // 2) initially accepted, later rejected
+ // This can happen when the resource requirements of a job
decrease between
+ // offers.
+ // In this case the first response MAY be ignored, because
the job no longer
+ // requires the slot (and already has initiated steps to
free it) and we can thus
+ // assume that any in-flight task submissions are no longer
relevant for the job
+ // execution.
+
+ log.debug(
+ "Discard offer slot response since there is a newer
offer for the job {}.",
Review comment:
```suggestion
"Discard slot offer response since there is a newer
offer for the job {}.",
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
##########
@@ -983,6 +985,139 @@ public void testSlotAcceptance() throws Exception {
}
}
+ private enum ResponseOrder {
+ ACCEPT_THEN_REJECT,
+ REJECT_THEN_ACCEPT
+ }
+
+ /**
+ * Tests that the task executor does not release a slot that was rejected
by the job master, if
+ * another slot offer is currently in progress.
+ */
+ @Test
+ public void testRejectedSlotNotFreedIfAnotherOfferIsPending() throws
Exception {
+
testSlotOfferResponseWithPendingSlotOffer(ResponseOrder.REJECT_THEN_ACCEPT);
+ }
+
+ /**
+ * Tests that the task executor does not activate a slot that was accepted
by the job master, if
+ * another slot offer is currently in progress.
+ */
+ @Test
+ public void testAcceptedSlotNotActivatedIfAnotherOfferIsPending() throws
Exception {
+
testSlotOfferResponseWithPendingSlotOffer(ResponseOrder.ACCEPT_THEN_REJECT);
+ }
+
+ /**
+ * Tests the behavior of the task executor when a slot offer response is
received while a newer
+ * slot offer is in progress.
+ */
+ private void testSlotOfferResponseWithPendingSlotOffer(final ResponseOrder
responseOrder)
+ throws Exception {
+ final OneShotLatch taskExecutorIsRegistered = new OneShotLatch();
+ final TestingResourceManagerGateway resourceManagerGateway =
+ createRmWithTmRegisterAndNotifySlotHooks(
+ new InstanceID(), taskExecutorIsRegistered, new
CompletableFuture<>());
+
+ final CompletableFuture<Collection<SlotOffer>>
firstOfferResponseFuture =
+ new CompletableFuture<>();
+ final CompletableFuture<Collection<SlotOffer>>
secondOfferResponseFuture =
+ new CompletableFuture<>();
+
+ final Queue<CompletableFuture<Collection<SlotOffer>>>
slotOfferResponses =
+ new ArrayDeque<>(
+ Arrays.asList(firstOfferResponseFuture,
secondOfferResponseFuture));
+
+ final MultiShotLatch offerSlotsLatch = new MultiShotLatch();
+ final TestingJobMasterGateway jobMasterGateway =
+ new TestingJobMasterGatewayBuilder()
+ .setOfferSlotsFunction(
+ (resourceID, slotOffers) -> {
+ offerSlotsLatch.trigger();
+ return slotOfferResponses.remove();
+ })
+ .build();
+
+ rpc.registerGateway(resourceManagerGateway.getAddress(),
resourceManagerGateway);
+ rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
+
+ final TaskSlotTable<Task> taskSlotTable =
TaskSlotUtils.createTaskSlotTable(2);
+ final TaskManagerServices taskManagerServices =
+ createTaskManagerServicesWithTaskSlotTable(taskSlotTable);
+ final TestingTaskExecutor taskExecutor =
createTestingTaskExecutor(taskManagerServices);
+
+ final ThreadSafeTaskSlotTable<Task> threadSafeTaskSlotTable =
+ new ThreadSafeTaskSlotTable<>(
+ taskSlotTable,
taskExecutor.getMainThreadExecutableForTesting());
+
+ final SlotOffer slotOffer1 = new SlotOffer(new AllocationID(), 0,
ResourceProfile.ANY);
+ final SlotOffer slotOffer2 = new SlotOffer(new AllocationID(), 1,
ResourceProfile.ANY);
+
+ try {
+ taskExecutor.start();
+ taskExecutor.waitUntilStarted();
+
+ final TaskExecutorGateway tmGateway =
+ taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+
+ // wait until task executor registered at the RM
+ taskExecutorIsRegistered.await();
+
+ // notify job leader to start slot offering
+ jobManagerLeaderRetriever.notifyListener(
+ jobMasterGateway.getAddress(),
jobMasterGateway.getFencingToken().toUUID());
+
+ // request the first slot
+ requestSlot(
+ tmGateway,
+ slotOffer1.getAllocationId(),
+ slotOffer1.getSlotIndex(),
+ resourceManagerGateway.getFencingToken(),
+ jobMasterGateway.getAddress());
+
+ // wait until first slot offer as arrived
+ offerSlotsLatch.await();
+
+ // request second slot, triggering another offer containing both
slots
+ requestSlot(
+ tmGateway,
+ slotOffer2.getAllocationId(),
+ slotOffer2.getSlotIndex(),
+ resourceManagerGateway.getFencingToken(),
+ jobMasterGateway.getAddress());
+
+ // wait until second slot offer as arrived
+ offerSlotsLatch.await();
+
+ switch (responseOrder) {
+ case ACCEPT_THEN_REJECT:
+ // accept the first offer, but reject both slots for the
second offer
+
firstOfferResponseFuture.complete(Collections.singletonList(slotOffer1));
+ assertThat(
+
threadSafeTaskSlotTable.getActiveTaskSlotAllocationIdsPerJob(jobId),
+ empty());
+ secondOfferResponseFuture.completeExceptionally(
+ new RuntimeException("Test exception"));
+
assertThat(threadSafeTaskSlotTable.getAllocationIdsPerJob(jobId), empty());
+ return;
+ case REJECT_THEN_ACCEPT:
+ // fail the first offer, but accept both slots for the
second offer
+ // in the past the rejection of the first offer freed the
slot; when the slot is
+ // accepted from the second offer the activation of said
slot then failed
+ firstOfferResponseFuture.completeExceptionally(
+ new RuntimeException("Test exception"));
Review comment:
This is not a rejection. Instead we should complete the future with
`Collections.emptyCollection` to make it a rejection.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
##########
@@ -983,6 +985,139 @@ public void testSlotAcceptance() throws Exception {
}
}
+ private enum ResponseOrder {
+ ACCEPT_THEN_REJECT,
Review comment:
Should we also test explicitly that it is possible to free an inactive
slot?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
##########
@@ -983,6 +985,139 @@ public void testSlotAcceptance() throws Exception {
}
}
+ private enum ResponseOrder {
+ ACCEPT_THEN_REJECT,
+ REJECT_THEN_ACCEPT
+ }
+
+ /**
+ * Tests that the task executor does not release a slot that was rejected
by the job master, if
+ * another slot offer is currently in progress.
+ */
+ @Test
+ public void testRejectedSlotNotFreedIfAnotherOfferIsPending() throws
Exception {
+
testSlotOfferResponseWithPendingSlotOffer(ResponseOrder.REJECT_THEN_ACCEPT);
+ }
+
+ /**
+ * Tests that the task executor does not activate a slot that was accepted
by the job master, if
+ * another slot offer is currently in progress.
+ */
+ @Test
+ public void testAcceptedSlotNotActivatedIfAnotherOfferIsPending() throws
Exception {
+
testSlotOfferResponseWithPendingSlotOffer(ResponseOrder.ACCEPT_THEN_REJECT);
+ }
+
+ /**
+ * Tests the behavior of the task executor when a slot offer response is
received while a newer
+ * slot offer is in progress.
+ */
+ private void testSlotOfferResponseWithPendingSlotOffer(final ResponseOrder
responseOrder)
+ throws Exception {
+ final OneShotLatch taskExecutorIsRegistered = new OneShotLatch();
+ final TestingResourceManagerGateway resourceManagerGateway =
+ createRmWithTmRegisterAndNotifySlotHooks(
+ new InstanceID(), taskExecutorIsRegistered, new
CompletableFuture<>());
+
+ final CompletableFuture<Collection<SlotOffer>>
firstOfferResponseFuture =
+ new CompletableFuture<>();
+ final CompletableFuture<Collection<SlotOffer>>
secondOfferResponseFuture =
+ new CompletableFuture<>();
+
+ final Queue<CompletableFuture<Collection<SlotOffer>>>
slotOfferResponses =
+ new ArrayDeque<>(
+ Arrays.asList(firstOfferResponseFuture,
secondOfferResponseFuture));
+
+ final MultiShotLatch offerSlotsLatch = new MultiShotLatch();
+ final TestingJobMasterGateway jobMasterGateway =
+ new TestingJobMasterGatewayBuilder()
+ .setOfferSlotsFunction(
+ (resourceID, slotOffers) -> {
+ offerSlotsLatch.trigger();
+ return slotOfferResponses.remove();
+ })
+ .build();
+
+ rpc.registerGateway(resourceManagerGateway.getAddress(),
resourceManagerGateway);
+ rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
+
+ final TaskSlotTable<Task> taskSlotTable =
TaskSlotUtils.createTaskSlotTable(2);
+ final TaskManagerServices taskManagerServices =
+ createTaskManagerServicesWithTaskSlotTable(taskSlotTable);
+ final TestingTaskExecutor taskExecutor =
createTestingTaskExecutor(taskManagerServices);
+
+ final ThreadSafeTaskSlotTable<Task> threadSafeTaskSlotTable =
+ new ThreadSafeTaskSlotTable<>(
+ taskSlotTable,
taskExecutor.getMainThreadExecutableForTesting());
+
+ final SlotOffer slotOffer1 = new SlotOffer(new AllocationID(), 0,
ResourceProfile.ANY);
+ final SlotOffer slotOffer2 = new SlotOffer(new AllocationID(), 1,
ResourceProfile.ANY);
+
+ try {
+ taskExecutor.start();
+ taskExecutor.waitUntilStarted();
+
+ final TaskExecutorGateway tmGateway =
+ taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+
+ // wait until task executor registered at the RM
+ taskExecutorIsRegistered.await();
+
+ // notify job leader to start slot offering
+ jobManagerLeaderRetriever.notifyListener(
+ jobMasterGateway.getAddress(),
jobMasterGateway.getFencingToken().toUUID());
+
+ // request the first slot
+ requestSlot(
+ tmGateway,
+ slotOffer1.getAllocationId(),
+ slotOffer1.getSlotIndex(),
+ resourceManagerGateway.getFencingToken(),
+ jobMasterGateway.getAddress());
+
+ // wait until first slot offer as arrived
+ offerSlotsLatch.await();
+
+ // request second slot, triggering another offer containing both
slots
+ requestSlot(
+ tmGateway,
+ slotOffer2.getAllocationId(),
+ slotOffer2.getSlotIndex(),
+ resourceManagerGateway.getFencingToken(),
+ jobMasterGateway.getAddress());
+
+ // wait until second slot offer as arrived
+ offerSlotsLatch.await();
+
+ switch (responseOrder) {
+ case ACCEPT_THEN_REJECT:
+ // accept the first offer, but reject both slots for the
second offer
+
firstOfferResponseFuture.complete(Collections.singletonList(slotOffer1));
+ assertThat(
+
threadSafeTaskSlotTable.getActiveTaskSlotAllocationIdsPerJob(jobId),
+ empty());
+ secondOfferResponseFuture.completeExceptionally(
+ new RuntimeException("Test exception"));
Review comment:
I think this is not the rejection case. If an exception is returned,
then Flink will try to re-offer the slots. If we reject the slots, then we
should also make sure that they are actually freed.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -1488,8 +1493,34 @@ private void
internalOfferSlotsToJobManager(JobTable.Connection jobManagerConnec
JobID jobId,
JobMasterGateway jobMasterGateway,
JobMasterId jobMasterId,
- Collection<SlotOffer> offeredSlots) {
+ Collection<SlotOffer> offeredSlots,
+ int offerId) {
return (Iterable<SlotOffer> acceptedSlots, Throwable throwable) -> {
+ // check if this is the latest offer
+ if (offerId != offerCounter) {
Review comment:
I think a global slot offer counter does not work. We need per job
counter because otherwise it is not guaranteed that we will get a slot offer
response for a given sub set of slots (for a given job).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]