[FLINK-9456] Let ResourceManager notify JobManager about failed/killed TaskManagers.
This closes #6132. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50c0ea8c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50c0ea8c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50c0ea8c Branch: refs/heads/master Commit: 50c0ea8c9fe17278d45aba476a95791152a1420b Parents: 432e48a Author: sihuazhou <[email protected]> Authored: Thu May 10 14:36:27 2018 +0800 Committer: Till Rohrmann <[email protected]> Committed: Sun Jul 1 21:10:04 2018 +0200 ---------------------------------------------------------------------- .../clusterframework/types/TaskManagerSlot.java | 19 +++- .../flink/runtime/jobmaster/JobMaster.java | 5 + .../runtime/jobmaster/JobMasterGateway.java | 8 ++ .../runtime/jobmaster/slotpool/SlotPool.java | 2 +- .../resourcemanager/ResourceManager.java | 5 + .../slotmanager/SlotManager.java | 45 +++++--- .../runtime/taskexecutor/TaskExecutor.java | 3 +- .../exceptions/SlotOccupiedException.java | 16 ++- .../utils/TestingJobMasterGateway.java | 5 + .../slotmanager/SlotManagerTest.java | 103 ++++++++++++++++++- 10 files changed, 186 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java index fb7fce3..be39424 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java @@ -18,11 +18,14 @@ package org.apache.flink.runtime.clusterframework.types; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; import org.apache.flink.runtime.resourcemanager.slotmanager.PendingSlotRequest; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + import java.util.Objects; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -45,6 +48,10 @@ public class TaskManagerSlot { /** Allocation id for which this slot has been allocated. */ private AllocationID allocationId; + /** Allocation id for which this slot has been allocated. */ + @Nullable + private JobID jobId; + /** Assigned slot request if there is currently an ongoing request. */ private PendingSlotRequest assignedSlotRequest; @@ -83,6 +90,10 @@ public class TaskManagerSlot { return allocationId; } + public JobID getJobId() { + return jobId; + } + public PendingSlotRequest getAssignedSlotRequest() { return assignedSlotRequest; } @@ -96,6 +107,7 @@ public class TaskManagerSlot { state = State.FREE; allocationId = null; + jobId = null; } public void clearPendingSlotRequest() { @@ -112,21 +124,24 @@ public class TaskManagerSlot { assignedSlotRequest = Preconditions.checkNotNull(pendingSlotRequest); } - public void completeAllocation(AllocationID allocationId) { + public void completeAllocation(AllocationID allocationId, JobID jobId) { Preconditions.checkNotNull(allocationId, "Allocation id must not be null."); + Preconditions.checkNotNull(jobId, "Job id must not be null."); Preconditions.checkState(state == State.PENDING, "In order to complete an allocation, the slot has to be allocated."); Preconditions.checkState(Objects.equals(allocationId, assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the pending slot request."); state = State.ALLOCATED; this.allocationId = allocationId; + this.jobId = jobId; assignedSlotRequest = null; } - public void updateAllocation(AllocationID allocationId) { + public void updateAllocation(AllocationID allocationId, JobID jobId) { Preconditions.checkState(state == State.FREE, "The slot has to be free in order to set an allocation id."); state = State.ALLOCATED; this.allocationId = Preconditions.checkNotNull(allocationId); + this.jobId = Preconditions.checkNotNull(jobId); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index e4a1b6a..7557bc3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -980,6 +980,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast operatorBackPressureStats.orElse(null))); } + @Override + public void notifyAllocationFailure(AllocationID allocationID, Exception cause) { + slotPool.failAllocation(allocationID, cause); + } + //---------------------------------------------------------------------------------------------- // Internal methods //---------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 4ea9357..981222d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -278,4 +278,12 @@ public interface JobMasterGateway extends * not available (yet). */ CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(JobVertexID jobVertexId); + + /** + * Notifies that the allocation has failed. + * + * @param allocationID the failed allocation id. + * @param cause the reason that the allocation failed + */ + void notifyAllocationFailure(AllocationID allocationID, Exception cause); } http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index 81b3e24..27440a3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -1021,7 +1021,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS allocatedSlot.releasePayload(cause); } else { - log.debug("Outdated request to fail slot [{}] with ", allocationID, cause); + log.trace("Outdated request to fail slot [{}] with ", allocationID, cause); } } // TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 6e5c824..3ea5c2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -1013,6 +1013,11 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) { validateRunsInMainThread(); log.info("Slot request with allocation id {} for job {} failed.", allocationId, jobId, cause); + + JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId); + if (jobManagerRegistration != null) { + jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId, cause); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index fe503b2..d0d03f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -343,6 +344,7 @@ public class SlotManager implements AutoCloseable { registerSlot( slotStatus.getSlotID(), slotStatus.getAllocationID(), + slotStatus.getJobID(), slotStatus.getResourceProfile(), taskExecutorConnection); } @@ -392,7 +394,7 @@ public class SlotManager implements AutoCloseable { if (null != taskManagerRegistration) { for (SlotStatus slotStatus : slotReport) { - updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID()); + updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID(), slotStatus.getJobID()); } return true; @@ -426,7 +428,7 @@ public class SlotManager implements AutoCloseable { slot.getInstanceId() + " which has not been registered."); } - updateSlotState(slot, taskManagerRegistration, null); + updateSlotState(slot, taskManagerRegistration, null, null); } else { LOG.debug("Received request to free slot {} with expected allocation id {}, " + "but actual allocation id {} differs. Ignoring the request.", slotId, allocationId, slot.getAllocationId()); @@ -515,6 +517,7 @@ public class SlotManager implements AutoCloseable { private void registerSlot( SlotID slotId, AllocationID allocationId, + JobID jobId, ResourceProfile resourceProfile, TaskExecutorConnection taskManagerConnection) { @@ -530,7 +533,7 @@ public class SlotManager implements AutoCloseable { slots.put(slotId, slot); - updateSlot(slotId, allocationId); + updateSlot(slotId, allocationId, jobId); } /** @@ -540,14 +543,14 @@ public class SlotManager implements AutoCloseable { * @param allocationId specifying the current allocation of the slot * @return True if the slot could be updated; otherwise false */ - private boolean updateSlot(SlotID slotId, AllocationID allocationId) { + private boolean updateSlot(SlotID slotId, AllocationID allocationId, JobID jobId) { final TaskManagerSlot slot = slots.get(slotId); if (slot != null) { final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId()); if (taskManagerRegistration != null) { - updateSlotState(slot, taskManagerRegistration, allocationId); + updateSlotState(slot, taskManagerRegistration, allocationId, jobId); return true; } else { @@ -561,7 +564,11 @@ public class SlotManager implements AutoCloseable { } } - private void updateSlotState(TaskManagerSlot slot, TaskManagerRegistration taskManagerRegistration, @Nullable AllocationID allocationId) { + private void updateSlotState( + TaskManagerSlot slot, + TaskManagerRegistration taskManagerRegistration, + @Nullable AllocationID allocationId, + @Nullable JobID jobId) { if (null != allocationId) { switch (slot.getState()) { case PENDING: @@ -575,12 +582,12 @@ public class SlotManager implements AutoCloseable { // remove the pending slot request, since it has been completed pendingSlotRequests.remove(pendingSlotRequest.getAllocationId()); - slot.completeAllocation(allocationId); + slot.completeAllocation(allocationId, jobId); } else { // we first have to free the slot in order to set a new allocationId slot.clearPendingSlotRequest(); // set the allocation id such that the slot won't be considered for the pending slot request - slot.updateAllocation(allocationId); + slot.updateAllocation(allocationId, jobId); // this will try to find a new slot for the request rejectPendingSlotRequest( @@ -593,13 +600,13 @@ public class SlotManager implements AutoCloseable { case ALLOCATED: if (!Objects.equals(allocationId, slot.getAllocationId())) { slot.freeSlot(); - slot.updateAllocation(allocationId); + slot.updateAllocation(allocationId, jobId); } break; case FREE: // the slot is currently free --> it is stored in freeSlots freeSlots.remove(slot.getSlotId()); - slot.updateAllocation(allocationId); + slot.updateAllocation(allocationId, jobId); taskManagerRegistration.occupySlot(); break; } @@ -660,15 +667,16 @@ public class SlotManager implements AutoCloseable { final CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>(); final AllocationID allocationId = pendingSlotRequest.getAllocationId(); final SlotID slotId = taskManagerSlot.getSlotId(); + final InstanceID instanceID = taskManagerSlot.getInstanceId(); taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest); pendingSlotRequest.setRequestFuture(completableFuture); - TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(taskManagerSlot.getInstanceId()); + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID); if (taskManagerRegistration == null) { throw new IllegalStateException("Could not find a registered task manager for instance id " + - taskManagerSlot.getInstanceId() + '.'); + instanceID + '.'); } taskManagerRegistration.markUsed(); @@ -695,11 +703,11 @@ public class SlotManager implements AutoCloseable { (Acknowledge acknowledge, Throwable throwable) -> { try { if (acknowledge != null) { - updateSlot(slotId, allocationId); + updateSlot(slotId, allocationId, pendingSlotRequest.getJobId()); } else { if (throwable instanceof SlotOccupiedException) { SlotOccupiedException exception = (SlotOccupiedException) throwable; - updateSlot(slotId, exception.getAllocationId()); + updateSlot(slotId, exception.getAllocationId(), exception.getJobId()); } else { removeSlotRequestFromSlot(slotId, allocationId); } @@ -765,8 +773,11 @@ public class SlotManager implements AutoCloseable { } AllocationID oldAllocationId = slot.getAllocationId(); - - fulfilledSlotRequests.remove(oldAllocationId); + if (oldAllocationId != null) { + fulfilledSlotRequests.remove(oldAllocationId); + resourceActions.notifyAllocationFailure( + slot.getJobId(), oldAllocationId, new Exception("The assigned slot " + slot.getSlotId() + " was removed.")); + } } else { LOG.debug("There was no slot registered with slot id {}.", slotId); } @@ -798,7 +809,7 @@ public class SlotManager implements AutoCloseable { // clear the pending slot request taskManagerSlot.clearPendingSlotRequest(); - updateSlotState(taskManagerSlot, taskManagerRegistration, null); + updateSlotState(taskManagerSlot, taskManagerRegistration, null, null); } else { LOG.debug("Ignore slot request removal for slot {}.", slotId); } http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index dda2688..ae69e56 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -762,7 +762,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { log.info(message); - throw new SlotOccupiedException(message, taskSlotTable.getCurrentAllocation(slotId.getSlotNumber())); + final AllocationID allocationID = taskSlotTable.getCurrentAllocation(slotId.getSlotNumber()); + throw new SlotOccupiedException(message, allocationID, taskSlotTable.getOwningJob(allocationID)); } if (jobManagerTable.contains(jobId)) { http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java index 93e67a8..818754c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.taskexecutor.exceptions; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.util.Preconditions; @@ -26,22 +27,31 @@ public class SlotOccupiedException extends SlotAllocationException { private final AllocationID allocationId; - public SlotOccupiedException(String message, AllocationID allocationId) { + private final JobID jobId; + + public SlotOccupiedException(String message, AllocationID allocationId, JobID jobId) { super(message); this.allocationId = Preconditions.checkNotNull(allocationId); + this.jobId = jobId; } - public SlotOccupiedException(String message, Throwable cause, AllocationID allocationId) { + public SlotOccupiedException(String message, Throwable cause, AllocationID allocationId, JobID jobId) { super(message, cause); this.allocationId = Preconditions.checkNotNull(allocationId); + this.jobId = jobId; } - public SlotOccupiedException(Throwable cause, AllocationID allocationId) { + public SlotOccupiedException(Throwable cause, AllocationID allocationId, JobID jobId) { super(cause); this.allocationId = Preconditions.checkNotNull(allocationId); + this.jobId = jobId; } public AllocationID getAllocationId() { return allocationId; } + + public JobID getJobId() { + return jobId; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java index 65117af..e887fc1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java @@ -166,6 +166,11 @@ public class TestingJobMasterGateway implements JobMasterGateway { } @Override + public void notifyAllocationFailure(AllocationID allocationID, Exception cause) { + throw new UnsupportedOperationException(); + } + + @Override public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index af6f3e4..1b072d7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -48,7 +49,12 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; import org.mockito.ArgumentCaptor; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -57,6 +63,8 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -1148,7 +1156,7 @@ public class SlotManagerTest extends TestLogger { } /** - * Testst that the SlotManager retries allocating a slot if the TaskExecutor#requestSlot call + * Tests that the SlotManager retries allocating a slot if the TaskExecutor#requestSlot call * fails. */ @Test @@ -1202,6 +1210,99 @@ public class SlotManagerTest extends TestLogger { } } + /** + * Tests notify the job manager of the allocations when the task manager is failed/killed. + */ + @Test + public void testNotifyFailedAllocationWhenTaskManagerTerminated() throws Exception { + + final List<Tuple2<JobID, AllocationID>> notifiedTaskManagerInfos = new ArrayList<>(); + + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions() { + @Override + public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) { + notifiedTaskManagerInfos.add(new Tuple2<>(jobId, allocationId)); + }})) { + + // register slot request for job1. + JobID jobId1 = new JobID(); + final SlotRequest slotRequest11 = new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1"); + final SlotRequest slotRequest12 = new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1"); + slotManager.registerSlotRequest(slotRequest11); + slotManager.registerSlotRequest(slotRequest12); + + // create task-manager-1 with 2 slots. + final ResourceID taskExecutorResourceId1 = ResourceID.generate(); + final TestingTaskExecutorGateway testingTaskExecutorGateway1 = new TestingTaskExecutorGatewayBuilder() + .createTestingTaskExecutorGateway(); + final TaskExecutorConnection taskExecutionConnection1 = new TaskExecutorConnection(taskExecutorResourceId1, testingTaskExecutorGateway1); + final Set<SlotStatus> tm1SlotStatusList = new HashSet<>(); + tm1SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId1, 0), ResourceProfile.UNKNOWN)); + tm1SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId1, 1), ResourceProfile.UNKNOWN)); + + // register the task-manager-1 to the slot manager, this will trigger the slot allocation for job1. + slotManager.registerTaskManager(taskExecutionConnection1, new SlotReport(tm1SlotStatusList)); + + // register slot request for job2. + JobID jobId2 = new JobID(); + final SlotRequest slotRequest21 = new SlotRequest(jobId2, new AllocationID(), ResourceProfile.UNKNOWN, "foobar2"); + final SlotRequest slotRequest22 = new SlotRequest(jobId2, new AllocationID(), ResourceProfile.UNKNOWN, "foobar2"); + slotManager.registerSlotRequest(slotRequest21); + slotManager.registerSlotRequest(slotRequest22); + + // register slot request for job3. + JobID jobId3 = new JobID(); + final SlotRequest slotRequest31 = new SlotRequest(jobId3, new AllocationID(), ResourceProfile.UNKNOWN, "foobar3"); + slotManager.registerSlotRequest(slotRequest31); + + // create task-manager-2 with 3 slots. + final ResourceID taskExecutorResourceId2 = ResourceID.generate(); + final TestingTaskExecutorGateway testingTaskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder() + .createTestingTaskExecutorGateway(); + final TaskExecutorConnection taskExecutionConnection2 = new TaskExecutorConnection(taskExecutorResourceId2, testingTaskExecutorGateway2); + final Set<SlotStatus> tm2SlotStatusList = new HashSet<>(); + tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 0), ResourceProfile.UNKNOWN)); + tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 1), ResourceProfile.UNKNOWN)); + tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 2), ResourceProfile.UNKNOWN)); + tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 3), ResourceProfile.UNKNOWN)); + + // register the task-manager-2 to the slot manager, this will trigger the slot allocation for job2 and job3. + slotManager.registerTaskManager(taskExecutionConnection2, new SlotReport(tm2SlotStatusList)); + + // --------------------- valid the notify task manager terminated ------------------------ + + // valid for job1. + slotManager.unregisterTaskManager(taskExecutionConnection1.getInstanceID()); + + assertEquals(2, notifiedTaskManagerInfos.size()); + + assertThat(jobId1, equalTo(notifiedTaskManagerInfos.get(0).f0)); + assertThat(jobId1, equalTo(notifiedTaskManagerInfos.get(1).f0)); + + assertEquals(Stream.of(slotRequest11.getAllocationId(), slotRequest12.getAllocationId()).collect(Collectors.toSet()), + Stream.of(notifiedTaskManagerInfos.get(0).f1, notifiedTaskManagerInfos.get(1).f1).collect(Collectors.toSet())); + + notifiedTaskManagerInfos.clear(); + + // valid the result for job2 and job3. + slotManager.unregisterTaskManager(taskExecutionConnection2.getInstanceID()); + + assertEquals(3, notifiedTaskManagerInfos.size()); + + Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo = notifiedTaskManagerInfos.stream().collect(Collectors.groupingBy(tuple -> tuple.f0)); + + assertEquals(2, job2AndJob3FailedAllocationInfo.size()); + + // valid for job2 + assertEquals(Stream.of(slotRequest21.getAllocationId(), slotRequest22.getAllocationId()).collect(Collectors.toSet()), + job2AndJob3FailedAllocationInfo.get(jobId2).stream().map(tuple2 -> tuple2.f1).collect(Collectors.toSet())); + + // valid for job3 + assertEquals(Stream.of(slotRequest31.getAllocationId()).collect(Collectors.toSet()), + job2AndJob3FailedAllocationInfo.get(jobId3).stream().map(tuple2 -> tuple2.f1).collect(Collectors.toSet())); + } + } + private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) { SlotManager slotManager = new SlotManager( TestingUtils.defaultScheduledExecutor(),
