Repository: flink
Updated Branches:
  refs/heads/master 0e20b6130 -> e94a488dd


[FLINK-8504] [flip6] Deregister jobs from the JobLeaderService when no more 
slots allocated

Let the TaskExecutor deregister jobs from the JobLeaderService once it has no 
more slots
for this job allocated.

This closes #5361.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e94a488d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e94a488d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e94a488d

Branch: refs/heads/master
Commit: e94a488dd78e7c2efdf55a67cea886ee15a641a6
Parents: 23ff120
Author: Till Rohrmann <[email protected]>
Authored: Thu Jan 25 13:50:43 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Tue Jan 30 18:30:17 2018 +0100

----------------------------------------------------------------------
 .../runtime/taskexecutor/JobLeaderService.java  |  40 +++---
 .../runtime/taskexecutor/TaskExecutor.java      |  80 ++++++------
 .../taskexecutor/slot/TaskSlotTable.java        |  45 ++++++-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 121 ++++++++++++++++++-
 4 files changed, 223 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e94a488d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 3b4da4e..5376362 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -37,11 +38,11 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
 /**
@@ -80,7 +81,9 @@ public class JobLeaderService {
        public JobLeaderService(TaskManagerLocation location) {
                this.ownLocation = Preconditions.checkNotNull(location);
 
-               jobLeaderServices = new HashMap<>(4);
+               // Has to be a concurrent hash map because tests might access 
this service
+               // concurrently via containsJob
+               jobLeaderServices = new ConcurrentHashMap<>(4);
 
                state = JobLeaderService.State.CREATED;
 
@@ -147,18 +150,6 @@ public class JobLeaderService {
        }
 
        /**
-        * Check whether the service monitors the given job.
-        *
-        * @param jobId identifying the job
-        * @return True if the given job is monitored; otherwise false
-        */
-       public boolean containsJob(JobID jobId) {
-               Preconditions.checkState(JobLeaderService.State.STARTED == 
state, "The service is currently not running.");
-
-               return jobLeaderServices.containsKey(jobId);
-       }
-
-       /**
         * Remove the given job from being monitored by the job leader service.
         *
         * @param jobId identifying the job to remove from monitoring
@@ -199,9 +190,9 @@ public class JobLeaderService {
 
                JobLeaderService.JobManagerLeaderListener 
jobManagerLeaderListener = new JobManagerLeaderListener(jobId);
 
-               leaderRetrievalService.start(jobManagerLeaderListener);
-
                jobLeaderServices.put(jobId, Tuple2.of(leaderRetrievalService, 
jobManagerLeaderListener));
+
+               leaderRetrievalService.start(jobManagerLeaderListener);
        }
 
        /**
@@ -435,4 +426,21 @@ public class JobLeaderService {
        private enum State {
                CREATED, STARTED, STOPPED
        }
+
+       // -----------------------------------------------------------
+       // Testing methods
+       // -----------------------------------------------------------
+
+       /**
+        * Check whether the service monitors the given job.
+        *
+        * @param jobId identifying the job
+        * @return True if the given job is monitored; otherwise false
+        */
+       @VisibleForTesting
+       public boolean containsJob(JobID jobId) {
+               Preconditions.checkState(JobLeaderService.State.STARTED == 
state, "The service is currently not running.");
+
+               return jobLeaderServices.containsKey(jobId);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e94a488d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index ad7414c..9df2e88 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -93,7 +93,6 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -163,7 +162,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
        // --------- job manager connections -----------
 
-       private Map<ResourceID, JobManagerConnection> jobManagerConnections;
+       private final Map<ResourceID, JobManagerConnection> 
jobManagerConnections;
 
        // --------- task slot allocation table -----------
 
@@ -195,7 +194,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        JobLeaderService jobLeaderService,
                        FatalErrorHandler fatalErrorHandler) {
 
-               super(rpcService, 
AkkaRpcServiceUtils.createRandomName(TaskExecutor.TASK_MANAGER_NAME));
+               super(rpcService, 
AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
 
                checkArgument(taskManagerConfiguration.getNumberSlots() > 0, 
"The number of slots has to be larger than 0.");
 
@@ -978,10 +977,10 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        ResourceID resourceID,
                        JobMasterGateway jobMasterGateway,
                        int blobPort) {
-               Preconditions.checkNotNull(jobID);
-               Preconditions.checkNotNull(resourceID);
-               Preconditions.checkNotNull(jobMasterGateway);
-               Preconditions.checkArgument(blobPort > 0 && blobPort < 
MAX_BLOB_PORT, "Blob server port is out of range.");
+               checkNotNull(jobID);
+               checkNotNull(resourceID);
+               checkNotNull(jobMasterGateway);
+               checkArgument(blobPort > 0 && blobPort < MAX_BLOB_PORT, "Blob 
server port is out of range.");
 
                TaskManagerActions taskManagerActions = new 
TaskManagerActionsImpl(jobMasterGateway);
 
@@ -1029,7 +1028,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        }
 
        private void disassociateFromJobManager(JobManagerConnection 
jobManagerConnection, Exception cause) throws IOException {
-               Preconditions.checkNotNull(jobManagerConnection);
+               checkNotNull(jobManagerConnection);
                JobMasterGateway jobManagerGateway = 
jobManagerConnection.getJobManagerGateway();
                jobManagerGateway.disconnectTaskManager(getResourceID(), cause);
                jobManagerConnection.getLibraryCacheManager().shutdown();
@@ -1104,36 +1103,40 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
        }
 
        private void freeSlotInternal(AllocationID allocationId, Throwable 
cause) {
-               Preconditions.checkNotNull(allocationId);
+               checkNotNull(allocationId);
 
                try {
-                       TaskSlot taskSlot = 
taskSlotTable.freeSlot(allocationId, cause);
+                       final JobID jobId = 
taskSlotTable.getOwningJob(allocationId);
 
-                       if (taskSlot != null && isConnectedToResourceManager()) 
{
-                               // the slot was freed. Tell the RM about it
-                               ResourceManagerGateway resourceManagerGateway = 
resourceManagerConnection.getTargetGateway();
+                       final int slotIndex = 
taskSlotTable.freeSlot(allocationId, cause);
 
-                               resourceManagerGateway.notifySlotAvailable(
-                                       
resourceManagerConnection.getRegistrationId(),
-                                       new SlotID(getResourceID(), 
taskSlot.getIndex()),
-                                       allocationId);
+                       if (slotIndex != -1) {
 
-                               // check whether we still have allocated slots 
for the same job
-                               final JobID jobId = taskSlot.getJobId();
-                               final Iterator<Task> tasks = 
taskSlotTable.getTasks(jobId);
+                               if (isConnectedToResourceManager()) {
+                                       // the slot was freed. Tell the RM 
about it
+                                       ResourceManagerGateway 
resourceManagerGateway = resourceManagerConnection.getTargetGateway();
 
-                               if (!tasks.hasNext()) {
-                                       // we can remove the job from the job 
leader service
-                                       try {
-                                               
jobLeaderService.removeJob(jobId);
-                                       } catch (Exception e) {
-                                               log.info("Could not remove job 
{} from JobLeaderService.", jobId, e);
-                                       }
+                                       
resourceManagerGateway.notifySlotAvailable(
+                                               
resourceManagerConnection.getRegistrationId(),
+                                               new SlotID(getResourceID(), 
slotIndex),
+                                               allocationId);
+                               }
 
-                                       closeJobManagerConnection(
-                                               jobId,
-                                               new 
FlinkException("TaskExecutor " + getAddress() +
-                                                       " has no more allocated 
slots for job " + jobId + '.'));
+                               if (jobId != null) {
+                                       // check whether we still have 
allocated slots for the same job
+                                       if 
(taskSlotTable.getAllocationIdsPerJob(jobId).isEmpty()) {
+                                               // we can remove the job from 
the job leader service
+                                               try {
+                                                       
jobLeaderService.removeJob(jobId);
+                                               } catch (Exception e) {
+                                                       log.info("Could not 
remove job {} from JobLeaderService.", jobId, e);
+                                               }
+
+                                               closeJobManagerConnection(
+                                                       jobId,
+                                                       new 
FlinkException("TaskExecutor " + getAddress() +
+                                                               " has no more 
allocated slots for job " + jobId + '.'));
+                                       }
                                }
                        }
                } catch (SlotNotFoundException e) {
@@ -1141,13 +1144,9 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                }
        }
 
-       private void freeSlotInternal(AllocationID allocationId) {
-               freeSlotInternal(allocationId, new Exception("The slot " + 
allocationId + " is being freed."));
-       }
-
        private void timeoutSlot(AllocationID allocationId, UUID ticket) {
-               Preconditions.checkNotNull(allocationId);
-               Preconditions.checkNotNull(ticket);
+               checkNotNull(allocationId);
+               checkNotNull(ticket);
 
                if (taskSlotTable.isValidTimeout(allocationId, ticket)) {
                        freeSlotInternal(allocationId, new Exception("The slot 
" + allocationId + " has timed out."));
@@ -1285,7 +1284,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                private final JobMasterGateway jobMasterGateway;
 
                private TaskManagerActionsImpl(JobMasterGateway 
jobMasterGateway) {
-                       this.jobMasterGateway = 
Preconditions.checkNotNull(jobMasterGateway);
+                       this.jobMasterGateway = checkNotNull(jobMasterGateway);
                }
 
                @Override
@@ -1318,7 +1317,10 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
                @Override
                public void freeSlot(final AllocationID allocationId) {
-                       runAsync(() -> 
TaskExecutor.this.freeSlotInternal(allocationId));
+                       runAsync(() ->
+                               freeSlotInternal(
+                                       allocationId,
+                                       new FlinkException("TaskSlotTable 
requested freeing the TaskSlot " + allocationId + '.')));
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e94a488d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index fcb2761..f8f9164 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -133,6 +133,22 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
                slotActions = null;
        }
 
+       /**
+        * Returns the all {@link AllocationID} for the given job.
+        *
+        * @param jobId for which to return the set of {@link AllocationID}.
+        * @return Set of {@link AllocationID} for the given job
+        */
+       public Set<AllocationID> getAllocationIdsPerJob(JobID jobId) {
+               final Set<AllocationID> allocationIds = slotsPerJob.get(jobId);
+
+               if (allocationIds == null) {
+                       return Collections.emptySet();
+               } else {
+                       return Collections.unmodifiableSet(allocationIds);
+               }
+       }
+
        // ---------------------------------------------------------------------
        // Slot report methods
        // ---------------------------------------------------------------------
@@ -268,7 +284,7 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
         * @throws SlotNotFoundException if there is not task slot for the 
given allocation id
         * @return Index of the freed slot if the slot could be freed; 
otherwise -1
         */
-       public TaskSlot freeSlot(AllocationID allocationId) throws 
SlotNotFoundException {
+       public int freeSlot(AllocationID allocationId) throws 
SlotNotFoundException {
                return freeSlot(allocationId, new Exception("The task slot of 
this task is being freed."));
        }
 
@@ -282,8 +298,7 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
         * @throws SlotNotFoundException if there is not task slot for the 
given allocation id
         * @return The freed TaskSlot. If the TaskSlot cannot be freed then 
null.
         */
-       @Nullable
-       public TaskSlot freeSlot(AllocationID allocationId, Throwable cause) 
throws SlotNotFoundException {
+       public int freeSlot(AllocationID allocationId, Throwable cause) throws 
SlotNotFoundException {
                checkInit();
 
                TaskSlot taskSlot = getTaskSlot(allocationId);
@@ -317,7 +332,7 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
                                        slotsPerJob.remove(jobId);
                                }
 
-                               return taskSlot;
+                               return taskSlot.getIndex();
                        } else {
                                // we couldn't free the task slot because it 
still contains task, fail the tasks
                                // and set the slot state to releasing so that 
it gets eventually freed
@@ -329,7 +344,7 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
                                        
taskIterator.next().failExternally(cause);
                                }
 
-                               return null;
+                               return -1;
                        }
                } else {
                        throw new SlotNotFoundException(allocationId);
@@ -422,6 +437,25 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
                return new AllocationIDIterator(jobId, TaskSlotState.ACTIVE);
        }
 
+       /**
+        * Returns the owning job of the {@link TaskSlot} identified by the
+        * given {@link AllocationID}.
+        *
+        * @param allocationId identifying the slot for which to retrieve the 
owning job
+        * @return Owning job of the specified {@link TaskSlot} or null if 
there is no slot for
+        * the given allocation id or if the slot has no owning job assigned
+        */
+       @Nullable
+       public JobID getOwningJob(AllocationID allocationId) {
+               final TaskSlot taskSlot = getTaskSlot(allocationId);
+
+               if (taskSlot != null) {
+                       return taskSlot.getJobId();
+               } else {
+                       return null;
+               }
+       }
+
        // ---------------------------------------------------------------------
        // Task methods
        // ---------------------------------------------------------------------
@@ -538,6 +572,7 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
        // Internal methods
        // ---------------------------------------------------------------------
 
+       @Nullable
        private TaskSlot getTaskSlot(AllocationID allocationId) {
                Preconditions.checkNotNull(allocationId);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e94a488d/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 0c3adae..efd27f5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -61,12 +61,14 @@ import 
org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
@@ -89,6 +91,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
@@ -106,7 +109,6 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 
-import java.io.File;
 import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.Collection;
@@ -122,6 +124,7 @@ import java.util.concurrent.TimeoutException;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -143,7 +146,6 @@ import static org.mockito.Mockito.when;
 public class TaskExecutorTest extends TestLogger {
 
        private final Time timeout = Time.milliseconds(10000L);
-       private final File tempDir = new 
File(System.getProperty("java.io.tmpdir"));
 
        private TimerService<AllocationID> timerService;
 
@@ -1034,7 +1036,6 @@ public class TaskExecutorTest extends TestLogger {
                        mock(NetworkEnvironment.class),
                        haServices,
                        mock(HeartbeatServices.class, RETURNS_MOCKS),
-
                        mock(TaskManagerMetricGroup.class),
                        mock(BroadcastVariableManager.class),
                        mock(FileCache.class),
@@ -1544,6 +1545,120 @@ public class TaskExecutorTest extends TestLogger {
        }
 
        /**
+        * Tests that a job is removed from the JobLeaderService once a 
TaskExecutor has
+        * no more slots assigned to this job.
+        *
+        * <p>See FLINK-8504
+        */
+       @Test
+       public void testRemoveJobFromJobLeaderService() throws Exception {
+               final Configuration configuration = new Configuration();
+               final TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
+               final LocalTaskManagerLocation localTaskManagerLocation = new 
LocalTaskManagerLocation();
+               final JobLeaderService jobLeaderService = new 
JobLeaderService(localTaskManagerLocation);
+               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
+               final TaskSlotTable taskSlotTable = new TaskSlotTable(
+                       Collections.singleton(ResourceProfile.UNKNOWN),
+                       timerService);
+
+               final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+
+               final TestingLeaderRetrievalService 
resourceManagerLeaderRetriever = new TestingLeaderRetrievalService();
+               
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
+
+               final TaskExecutor taskExecutor = new TaskExecutor(
+                       rpc,
+                       taskManagerConfiguration,
+                       localTaskManagerLocation,
+                       mock(MemoryManager.class),
+                       mock(IOManager.class),
+                       new TaskExecutorLocalStateStoresManager(),
+                       mock(NetworkEnvironment.class),
+                       haServices,
+                       new HeartbeatServices(1000L, 1000L),
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+                       new BroadcastVariableManager(),
+                       mock(FileCache.class),
+                       taskSlotTable,
+                       new JobManagerTable(),
+                       jobLeaderService,
+                       testingFatalErrorHandler);
+
+               try {
+                       final TestingResourceManagerGateway 
resourceManagerGateway = new TestingResourceManagerGateway();
+                       final ResourceManagerId resourceManagerId = 
resourceManagerGateway.getFencingToken();
+
+                       
rpc.registerGateway(resourceManagerGateway.getAddress(), 
resourceManagerGateway);
+                       
resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(),
 resourceManagerId.toUUID());
+
+                       final JobID jobId = new JobID();
+
+                       final CompletableFuture<LeaderRetrievalListener> 
startFuture = new CompletableFuture<>();
+                       final CompletableFuture<Void> stopFuture = new 
CompletableFuture<>();
+
+                       final StartStopNotifyingLeaderRetrievalService 
jobMasterLeaderRetriever = new StartStopNotifyingLeaderRetrievalService(
+                               startFuture,
+                               stopFuture);
+                       haServices.setJobMasterLeaderRetriever(jobId, 
jobMasterLeaderRetriever);
+
+                       taskExecutor.start();
+
+                       final TaskExecutorGateway taskExecutorGateway = 
taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+
+                       final SlotID slotId = new 
SlotID(localTaskManagerLocation.getResourceID(), 0);
+                       final AllocationID allocationId = new AllocationID();
+
+                       assertThat(startFuture.isDone(), is(false));
+                       assertThat(jobLeaderService.containsJob(jobId), 
is(false));
+
+                       taskExecutorGateway.requestSlot(
+                               slotId,
+                               jobId,
+                               allocationId,
+                               "foobar",
+                               resourceManagerId,
+                               timeout).get();
+
+                       // wait until the job leader retrieval service for 
jobId is started
+                       startFuture.get();
+                       assertThat(jobLeaderService.containsJob(jobId), 
is(true));
+
+                       taskExecutorGateway.freeSlot(allocationId, new 
FlinkException("Test exception"), timeout).get();
+
+                       // wait that the job leader retrieval service for jobId 
stopped becaue it should get removed
+                       stopFuture.get();
+                       assertThat(jobLeaderService.containsJob(jobId), 
is(false));
+
+                       testingFatalErrorHandler.rethrowError();
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+               }
+       }
+
+       private static final class StartStopNotifyingLeaderRetrievalService 
implements LeaderRetrievalService {
+               private final CompletableFuture<LeaderRetrievalListener> 
startFuture;
+
+               private final CompletableFuture<Void> stopFuture;
+
+               private StartStopNotifyingLeaderRetrievalService(
+                               CompletableFuture<LeaderRetrievalListener> 
startFuture,
+                               CompletableFuture<Void> stopFuture) {
+                       this.startFuture = startFuture;
+                       this.stopFuture = stopFuture;
+               }
+
+               @Override
+               public void start(LeaderRetrievalListener listener) throws 
Exception {
+                       startFuture.complete(listener);
+               }
+
+               @Override
+               public void stop() throws Exception {
+                       stopFuture.complete(null);
+               }
+       }
+
+       /**
         * Special {@link HeartbeatServices} which creates a {@link 
RecordingHeartbeatManagerImpl}.
         */
        private static final class RecordingHeartbeatServices extends 
HeartbeatServices {

Reply via email to