[FLINK-9456] Let ResourceManager notify JobManager about failed/killed 
TaskManagers.

This closes #6132.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50c0ea8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50c0ea8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50c0ea8c

Branch: refs/heads/master
Commit: 50c0ea8c9fe17278d45aba476a95791152a1420b
Parents: 432e48a
Author: sihuazhou <[email protected]>
Authored: Thu May 10 14:36:27 2018 +0800
Committer: Till Rohrmann <[email protected]>
Committed: Sun Jul 1 21:10:04 2018 +0200

----------------------------------------------------------------------
 .../clusterframework/types/TaskManagerSlot.java |  19 +++-
 .../flink/runtime/jobmaster/JobMaster.java      |   5 +
 .../runtime/jobmaster/JobMasterGateway.java     |   8 ++
 .../runtime/jobmaster/slotpool/SlotPool.java    |   2 +-
 .../resourcemanager/ResourceManager.java        |   5 +
 .../slotmanager/SlotManager.java                |  45 +++++---
 .../runtime/taskexecutor/TaskExecutor.java      |   3 +-
 .../exceptions/SlotOccupiedException.java       |  16 ++-
 .../utils/TestingJobMasterGateway.java          |   5 +
 .../slotmanager/SlotManagerTest.java            | 103 ++++++++++++++++++-
 10 files changed, 186 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
index fb7fce3..be39424 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.runtime.clusterframework.types;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.instance.InstanceID;
 import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.resourcemanager.slotmanager.PendingSlotRequest;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -45,6 +48,10 @@ public class TaskManagerSlot {
        /** Allocation id for which this slot has been allocated. */
        private AllocationID allocationId;
 
+       /** Allocation id for which this slot has been allocated. */
+       @Nullable
+       private JobID jobId;
+
        /** Assigned slot request if there is currently an ongoing request. */
        private PendingSlotRequest assignedSlotRequest;
 
@@ -83,6 +90,10 @@ public class TaskManagerSlot {
                return allocationId;
        }
 
+       public JobID getJobId() {
+               return jobId;
+       }
+
        public PendingSlotRequest getAssignedSlotRequest() {
                return assignedSlotRequest;
        }
@@ -96,6 +107,7 @@ public class TaskManagerSlot {
 
                state = State.FREE;
                allocationId = null;
+               jobId = null;
        }
 
        public void clearPendingSlotRequest() {
@@ -112,21 +124,24 @@ public class TaskManagerSlot {
                assignedSlotRequest = 
Preconditions.checkNotNull(pendingSlotRequest);
        }
 
-       public void completeAllocation(AllocationID allocationId) {
+       public void completeAllocation(AllocationID allocationId, JobID jobId) {
                Preconditions.checkNotNull(allocationId, "Allocation id must 
not be null.");
+               Preconditions.checkNotNull(jobId, "Job id must not be null.");
                Preconditions.checkState(state == State.PENDING, "In order to 
complete an allocation, the slot has to be allocated.");
                Preconditions.checkState(Objects.equals(allocationId, 
assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the 
pending slot request.");
 
                state = State.ALLOCATED;
                this.allocationId = allocationId;
+               this.jobId = jobId;
                assignedSlotRequest = null;
        }
 
-       public void updateAllocation(AllocationID allocationId) {
+       public void updateAllocation(AllocationID allocationId, JobID jobId) {
                Preconditions.checkState(state == State.FREE, "The slot has to 
be free in order to set an allocation id.");
 
                state = State.ALLOCATED;
                this.allocationId = Preconditions.checkNotNull(allocationId);
+               this.jobId = Preconditions.checkNotNull(jobId);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index e4a1b6a..7557bc3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -980,6 +980,11 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        operatorBackPressureStats.orElse(null)));
        }
 
+       @Override
+       public void notifyAllocationFailure(AllocationID allocationID, 
Exception cause) {
+               slotPool.failAllocation(allocationID, cause);
+       }
+
        
//----------------------------------------------------------------------------------------------
        // Internal methods
        
//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 4ea9357..981222d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -278,4 +278,12 @@ public interface JobMasterGateway extends
         * not available (yet).
         */
        CompletableFuture<OperatorBackPressureStatsResponse> 
requestOperatorBackPressureStats(JobVertexID jobVertexId);
+
+       /**
+        * Notifies that the allocation has failed.
+        *
+        * @param allocationID the failed allocation id.
+        * @param cause the reason that the allocation failed
+        */
+       void notifyAllocationFailure(AllocationID allocationID, Exception 
cause);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 81b3e24..27440a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -1021,7 +1021,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                                allocatedSlot.releasePayload(cause);
                        }
                        else {
-                               log.debug("Outdated request to fail slot [{}] 
with ", allocationID, cause);
+                               log.trace("Outdated request to fail slot [{}] 
with ", allocationID, cause);
                        }
                }
                // TODO: add some unit tests when the previous two are ready, 
the allocation may failed at any phase

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 6e5c824..3ea5c2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -1013,6 +1013,11 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                public void notifyAllocationFailure(JobID jobId, AllocationID 
allocationId, Exception cause) {
                        validateRunsInMainThread();
                        log.info("Slot request with allocation id {} for job {} 
failed.", allocationId, jobId, cause);
+
+                       JobManagerRegistration jobManagerRegistration = 
jobManagerRegistrations.get(jobId);
+                       if (jobManagerRegistration != null) {
+                               
jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId,
 cause);
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index fe503b2..d0d03f5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -343,6 +344,7 @@ public class SlotManager implements AutoCloseable {
                                registerSlot(
                                        slotStatus.getSlotID(),
                                        slotStatus.getAllocationID(),
+                                       slotStatus.getJobID(),
                                        slotStatus.getResourceProfile(),
                                        taskExecutorConnection);
                        }
@@ -392,7 +394,7 @@ public class SlotManager implements AutoCloseable {
                if (null != taskManagerRegistration) {
 
                        for (SlotStatus slotStatus : slotReport) {
-                               updateSlot(slotStatus.getSlotID(), 
slotStatus.getAllocationID());
+                               updateSlot(slotStatus.getSlotID(), 
slotStatus.getAllocationID(), slotStatus.getJobID());
                        }
 
                        return true;
@@ -426,7 +428,7 @@ public class SlotManager implements AutoCloseable {
                                                        slot.getInstanceId() + 
" which has not been registered.");
                                        }
 
-                                       updateSlotState(slot, 
taskManagerRegistration, null);
+                                       updateSlotState(slot, 
taskManagerRegistration, null, null);
                                } else {
                                        LOG.debug("Received request to free 
slot {} with expected allocation id {}, " +
                                                "but actual allocation id {} 
differs. Ignoring the request.", slotId, allocationId, slot.getAllocationId());
@@ -515,6 +517,7 @@ public class SlotManager implements AutoCloseable {
        private void registerSlot(
                        SlotID slotId,
                        AllocationID allocationId,
+                       JobID jobId,
                        ResourceProfile resourceProfile,
                        TaskExecutorConnection taskManagerConnection) {
 
@@ -530,7 +533,7 @@ public class SlotManager implements AutoCloseable {
 
                slots.put(slotId, slot);
 
-               updateSlot(slotId, allocationId);
+               updateSlot(slotId, allocationId, jobId);
        }
 
        /**
@@ -540,14 +543,14 @@ public class SlotManager implements AutoCloseable {
         * @param allocationId specifying the current allocation of the slot
         * @return True if the slot could be updated; otherwise false
         */
-       private boolean updateSlot(SlotID slotId, AllocationID allocationId) {
+       private boolean updateSlot(SlotID slotId, AllocationID allocationId, 
JobID jobId) {
                final TaskManagerSlot slot = slots.get(slotId);
 
                if (slot != null) {
                        final TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(slot.getInstanceId());
 
                        if (taskManagerRegistration != null) {
-                               updateSlotState(slot, taskManagerRegistration, 
allocationId);
+                               updateSlotState(slot, taskManagerRegistration, 
allocationId, jobId);
 
                                return true;
                        } else {
@@ -561,7 +564,11 @@ public class SlotManager implements AutoCloseable {
                }
        }
 
-       private void updateSlotState(TaskManagerSlot slot, 
TaskManagerRegistration taskManagerRegistration, @Nullable AllocationID 
allocationId) {
+       private void updateSlotState(
+               TaskManagerSlot slot,
+               TaskManagerRegistration taskManagerRegistration,
+               @Nullable AllocationID allocationId,
+               @Nullable JobID jobId) {
                if (null != allocationId) {
                        switch (slot.getState()) {
                                case PENDING:
@@ -575,12 +582,12 @@ public class SlotManager implements AutoCloseable {
                                                // remove the pending slot 
request, since it has been completed
                                                
pendingSlotRequests.remove(pendingSlotRequest.getAllocationId());
 
-                                               
slot.completeAllocation(allocationId);
+                                               
slot.completeAllocation(allocationId, jobId);
                                        } else {
                                                // we first have to free the 
slot in order to set a new allocationId
                                                slot.clearPendingSlotRequest();
                                                // set the allocation id such 
that the slot won't be considered for the pending slot request
-                                               
slot.updateAllocation(allocationId);
+                                               
slot.updateAllocation(allocationId, jobId);
 
                                                // this will try to find a new 
slot for the request
                                                rejectPendingSlotRequest(
@@ -593,13 +600,13 @@ public class SlotManager implements AutoCloseable {
                                case ALLOCATED:
                                        if (!Objects.equals(allocationId, 
slot.getAllocationId())) {
                                                slot.freeSlot();
-                                               
slot.updateAllocation(allocationId);
+                                               
slot.updateAllocation(allocationId, jobId);
                                        }
                                        break;
                                case FREE:
                                        // the slot is currently free --> it is 
stored in freeSlots
                                        freeSlots.remove(slot.getSlotId());
-                                       slot.updateAllocation(allocationId);
+                                       slot.updateAllocation(allocationId, 
jobId);
                                        taskManagerRegistration.occupySlot();
                                        break;
                        }
@@ -660,15 +667,16 @@ public class SlotManager implements AutoCloseable {
                final CompletableFuture<Acknowledge> completableFuture = new 
CompletableFuture<>();
                final AllocationID allocationId = 
pendingSlotRequest.getAllocationId();
                final SlotID slotId = taskManagerSlot.getSlotId();
+               final InstanceID instanceID = taskManagerSlot.getInstanceId();
 
                taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
                pendingSlotRequest.setRequestFuture(completableFuture);
 
-               TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(taskManagerSlot.getInstanceId());
+               TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(instanceID);
 
                if (taskManagerRegistration == null) {
                        throw new IllegalStateException("Could not find a 
registered task manager for instance id " +
-                               taskManagerSlot.getInstanceId() + '.');
+                               instanceID + '.');
                }
 
                taskManagerRegistration.markUsed();
@@ -695,11 +703,11 @@ public class SlotManager implements AutoCloseable {
                        (Acknowledge acknowledge, Throwable throwable) -> {
                                try {
                                        if (acknowledge != null) {
-                                               updateSlot(slotId, 
allocationId);
+                                               updateSlot(slotId, 
allocationId, pendingSlotRequest.getJobId());
                                        } else {
                                                if (throwable instanceof 
SlotOccupiedException) {
                                                        SlotOccupiedException 
exception = (SlotOccupiedException) throwable;
-                                                       updateSlot(slotId, 
exception.getAllocationId());
+                                                       updateSlot(slotId, 
exception.getAllocationId(), exception.getJobId());
                                                } else {
                                                        
removeSlotRequestFromSlot(slotId, allocationId);
                                                }
@@ -765,8 +773,11 @@ public class SlotManager implements AutoCloseable {
                        }
 
                        AllocationID oldAllocationId = slot.getAllocationId();
-
-                       fulfilledSlotRequests.remove(oldAllocationId);
+                       if (oldAllocationId != null) {
+                               fulfilledSlotRequests.remove(oldAllocationId);
+                               resourceActions.notifyAllocationFailure(
+                                       slot.getJobId(), oldAllocationId, new 
Exception("The assigned slot " + slot.getSlotId() + " was removed."));
+                       }
                } else {
                        LOG.debug("There was no slot registered with slot id 
{}.", slotId);
                }
@@ -798,7 +809,7 @@ public class SlotManager implements AutoCloseable {
                                // clear the pending slot request
                                taskManagerSlot.clearPendingSlotRequest();
 
-                               updateSlotState(taskManagerSlot, 
taskManagerRegistration, null);
+                               updateSlotState(taskManagerSlot, 
taskManagerRegistration, null, null);
                        } else {
                                LOG.debug("Ignore slot request removal for slot 
{}.", slotId);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index dda2688..ae69e56 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -762,7 +762,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
                                log.info(message);
 
-                               throw new SlotOccupiedException(message, 
taskSlotTable.getCurrentAllocation(slotId.getSlotNumber()));
+                               final AllocationID allocationID = 
taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
+                               throw new SlotOccupiedException(message, 
allocationID, taskSlotTable.getOwningJob(allocationID));
                        }
 
                        if (jobManagerTable.contains(jobId)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
index 93e67a8..818754c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskexecutor.exceptions;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.util.Preconditions;
 
@@ -26,22 +27,31 @@ public class SlotOccupiedException extends 
SlotAllocationException {
 
        private final AllocationID allocationId;
 
-       public SlotOccupiedException(String message, AllocationID allocationId) 
{
+       private final JobID jobId;
+
+       public SlotOccupiedException(String message, AllocationID allocationId, 
JobID jobId) {
                super(message);
                this.allocationId = Preconditions.checkNotNull(allocationId);
+               this.jobId = jobId;
        }
 
-       public SlotOccupiedException(String message, Throwable cause, 
AllocationID allocationId) {
+       public SlotOccupiedException(String message, Throwable cause, 
AllocationID allocationId, JobID jobId) {
                super(message, cause);
                this.allocationId = Preconditions.checkNotNull(allocationId);
+               this.jobId = jobId;
        }
 
-       public SlotOccupiedException(Throwable cause, AllocationID 
allocationId) {
+       public SlotOccupiedException(Throwable cause, AllocationID 
allocationId, JobID jobId) {
                super(cause);
                this.allocationId = Preconditions.checkNotNull(allocationId);
+               this.jobId = jobId;
        }
 
        public AllocationID getAllocationId() {
                return allocationId;
        }
+
+       public JobID getJobId() {
+               return jobId;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 65117af..e887fc1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -166,6 +166,11 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
        }
 
        @Override
+       public void notifyAllocationFailure(AllocationID allocationID, 
Exception cause) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
        public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID 
executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, 
TaskStateSnapshot subtaskState) {
                throw new UnsupportedOperationException();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/50c0ea8c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index af6f3e4..1b072d7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -48,7 +49,12 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
@@ -57,6 +63,8 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -1148,7 +1156,7 @@ public class SlotManagerTest extends TestLogger {
        }
 
        /**
-        * Testst that the SlotManager retries allocating a slot if the 
TaskExecutor#requestSlot call
+        * Tests that the SlotManager retries allocating a slot if the 
TaskExecutor#requestSlot call
         * fails.
         */
        @Test
@@ -1202,6 +1210,99 @@ public class SlotManagerTest extends TestLogger {
                }
        }
 
+       /**
+        * Tests notify the job manager of the allocations when the task 
manager is failed/killed.
+        */
+       @Test
+       public void testNotifyFailedAllocationWhenTaskManagerTerminated() 
throws Exception {
+
+               final List<Tuple2<JobID, AllocationID>> 
notifiedTaskManagerInfos = new ArrayList<>();
+
+               try (final SlotManager slotManager = 
createSlotManager(ResourceManagerId.generate(), new TestingResourceActions() {
+                               @Override
+                               public void notifyAllocationFailure(JobID 
jobId, AllocationID allocationId, Exception cause) {
+                                       notifiedTaskManagerInfos.add(new 
Tuple2<>(jobId, allocationId));
+                               }})) {
+
+                       // register slot request for job1.
+                       JobID jobId1 = new JobID();
+                       final SlotRequest slotRequest11 = new 
SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1");
+                       final SlotRequest slotRequest12 = new 
SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1");
+                       slotManager.registerSlotRequest(slotRequest11);
+                       slotManager.registerSlotRequest(slotRequest12);
+
+                       // create task-manager-1 with 2 slots.
+                       final ResourceID taskExecutorResourceId1 = 
ResourceID.generate();
+                       final TestingTaskExecutorGateway 
testingTaskExecutorGateway1 = new TestingTaskExecutorGatewayBuilder()
+                               .createTestingTaskExecutorGateway();
+                       final TaskExecutorConnection taskExecutionConnection1 = 
new TaskExecutorConnection(taskExecutorResourceId1, 
testingTaskExecutorGateway1);
+                       final Set<SlotStatus> tm1SlotStatusList = new 
HashSet<>();
+                       tm1SlotStatusList.add(new SlotStatus(new 
SlotID(taskExecutorResourceId1, 0), ResourceProfile.UNKNOWN));
+                       tm1SlotStatusList.add(new SlotStatus(new 
SlotID(taskExecutorResourceId1, 1), ResourceProfile.UNKNOWN));
+
+                       // register the task-manager-1 to the slot manager, 
this will trigger the slot allocation for job1.
+                       
slotManager.registerTaskManager(taskExecutionConnection1, new 
SlotReport(tm1SlotStatusList));
+
+                       // register slot request for job2.
+                       JobID jobId2 = new JobID();
+                       final SlotRequest slotRequest21 = new 
SlotRequest(jobId2, new AllocationID(), ResourceProfile.UNKNOWN, "foobar2");
+                       final SlotRequest slotRequest22 = new 
SlotRequest(jobId2, new AllocationID(), ResourceProfile.UNKNOWN, "foobar2");
+                       slotManager.registerSlotRequest(slotRequest21);
+                       slotManager.registerSlotRequest(slotRequest22);
+
+                       // register slot request for job3.
+                       JobID jobId3 = new JobID();
+                       final SlotRequest slotRequest31 = new 
SlotRequest(jobId3, new AllocationID(), ResourceProfile.UNKNOWN, "foobar3");
+                       slotManager.registerSlotRequest(slotRequest31);
+
+                       // create task-manager-2 with 3 slots.
+                       final ResourceID taskExecutorResourceId2 = 
ResourceID.generate();
+                       final TestingTaskExecutorGateway 
testingTaskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder()
+                               .createTestingTaskExecutorGateway();
+                       final TaskExecutorConnection taskExecutionConnection2 = 
new TaskExecutorConnection(taskExecutorResourceId2, 
testingTaskExecutorGateway2);
+                       final Set<SlotStatus> tm2SlotStatusList = new 
HashSet<>();
+                       tm2SlotStatusList.add(new SlotStatus(new 
SlotID(taskExecutorResourceId2, 0), ResourceProfile.UNKNOWN));
+                       tm2SlotStatusList.add(new SlotStatus(new 
SlotID(taskExecutorResourceId2, 1), ResourceProfile.UNKNOWN));
+                       tm2SlotStatusList.add(new SlotStatus(new 
SlotID(taskExecutorResourceId2, 2), ResourceProfile.UNKNOWN));
+                       tm2SlotStatusList.add(new SlotStatus(new 
SlotID(taskExecutorResourceId2, 3), ResourceProfile.UNKNOWN));
+
+                       // register the task-manager-2 to the slot manager, 
this will trigger the slot allocation for job2 and job3.
+                       
slotManager.registerTaskManager(taskExecutionConnection2, new 
SlotReport(tm2SlotStatusList));
+
+                       // --------------------- valid the notify task manager 
terminated ------------------------
+
+                       // valid for job1.
+                       
slotManager.unregisterTaskManager(taskExecutionConnection1.getInstanceID());
+
+                       assertEquals(2, notifiedTaskManagerInfos.size());
+
+                       assertThat(jobId1, 
equalTo(notifiedTaskManagerInfos.get(0).f0));
+                       assertThat(jobId1, 
equalTo(notifiedTaskManagerInfos.get(1).f0));
+
+                       assertEquals(Stream.of(slotRequest11.getAllocationId(), 
slotRequest12.getAllocationId()).collect(Collectors.toSet()),
+                               Stream.of(notifiedTaskManagerInfos.get(0).f1, 
notifiedTaskManagerInfos.get(1).f1).collect(Collectors.toSet()));
+
+                       notifiedTaskManagerInfos.clear();
+
+                       // valid the result for job2 and job3.
+                       
slotManager.unregisterTaskManager(taskExecutionConnection2.getInstanceID());
+
+                       assertEquals(3, notifiedTaskManagerInfos.size());
+
+                       Map<JobID, List<Tuple2<JobID, AllocationID>>> 
job2AndJob3FailedAllocationInfo = 
notifiedTaskManagerInfos.stream().collect(Collectors.groupingBy(tuple -> 
tuple.f0));
+
+                       assertEquals(2, job2AndJob3FailedAllocationInfo.size());
+
+                       // valid for job2
+                       assertEquals(Stream.of(slotRequest21.getAllocationId(), 
slotRequest22.getAllocationId()).collect(Collectors.toSet()),
+                               
job2AndJob3FailedAllocationInfo.get(jobId2).stream().map(tuple2 -> 
tuple2.f1).collect(Collectors.toSet()));
+
+                       // valid for job3
+                       
assertEquals(Stream.of(slotRequest31.getAllocationId()).collect(Collectors.toSet()),
+                               
job2AndJob3FailedAllocationInfo.get(jobId3).stream().map(tuple2 -> 
tuple2.f1).collect(Collectors.toSet()));
+               }
+       }
+
        private SlotManager createSlotManager(ResourceManagerId 
resourceManagerId, ResourceActions resourceManagerActions) {
                SlotManager slotManager = new SlotManager(
                        TestingUtils.defaultScheduledExecutor(),

Reply via email to