Repository: flink
Updated Branches:
  refs/heads/master bbac4a6c9 -> 40656c5df


[FLINK-7077] [mesos] Implement task release to support dynamic scaling

- SlotManager: fix for idleness tracking (`markIdle` shouldn't reset 
`idleSince` on every call)
- ResourceManager: change `stopWorker` method to use `ResourceID`
- ResourceManager: schedule callbacks from `ResourceManagerActions` onto main 
thread
- MesosResourceManager: implement `stopWorker`
- MesosResourceManager: fix for message routing from child actors to RM

This closes #4560.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40656c5d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40656c5d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40656c5d

Branch: refs/heads/master
Commit: 40656c5dfdab7c0a1a7794dfe3a5f661f6156c6f
Parents: bbac4a6
Author: Wright, Eron <eron.wri...@emc.com>
Authored: Thu Aug 17 18:22:55 2017 -0700
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Sat Aug 19 17:28:01 2017 +0200

----------------------------------------------------------------------
 .../MesosFlinkResourceManager.java              |  2 +-
 .../clusterframework/MesosResourceManager.java  | 35 ++++++++++++++---
 .../apache/flink/mesos/scheduler/Tasks.scala    |  8 ++--
 .../MesosResourceManagerTest.java               | 41 +++++++++++++++++---
 .../flink/mesos/scheduler/TasksTest.scala       |  2 +-
 .../resourcemanager/ResourceManager.java        | 35 +++++++++++++++--
 .../StandaloneResourceManager.java              |  3 +-
 .../slotmanager/SlotManager.java                |  3 ++
 .../slotmanager/TaskManagerRegistration.java    |  4 +-
 .../clusterframework/ResourceManagerTest.java   |  2 +-
 .../TestingLeaderElectionService.java           | 37 +++++++++++++-----
 .../apache/flink/yarn/YarnResourceManager.java  |  3 +-
 12 files changed, 141 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
index 05d7e1f..6335745 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -192,7 +192,7 @@ public class MesosFlinkResourceManager extends 
FlinkResourceManager<RegisteredMe
 
        protected ActorRef createTaskRouter() {
                return context().actorOf(
-                       Tasks.createActorProps(Tasks.class, config, 
schedulerDriver, TaskMonitor.class),
+                       Tasks.createActorProps(Tasks.class, self(), config, 
schedulerDriver, TaskMonitor.class),
                        "tasks");
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 445010b..9a2ad42 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -49,7 +49,6 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
@@ -197,7 +196,7 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
 
        protected ActorRef createTaskMonitor(SchedulerDriver schedulerDriver) {
                return actorSystem.actorOf(
-                       Tasks.createActorProps(Tasks.class, flinkConfig, 
schedulerDriver, TaskMonitor.class),
+                       Tasks.createActorProps(Tasks.class, selfActor, 
flinkConfig, schedulerDriver, TaskMonitor.class),
                        "tasks");
        }
 
@@ -422,8 +421,34 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
        }
 
        @Override
-       public void stopWorker(InstanceID instanceId) {
-               // TODO implement worker release
+       public void stopWorker(ResourceID resourceID) {
+               LOG.info("Stopping worker {}.", resourceID);
+               try {
+
+                       if (workersInLaunch.containsKey(resourceID)) {
+                               // update persistent state of worker to Released
+                               MesosWorkerStore.Worker worker = 
workersInLaunch.remove(resourceID);
+                               worker = worker.releaseWorker();
+                               workerStore.putWorker(worker);
+                               
workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
+
+                               taskMonitor.tell(new 
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
+
+                               if (worker.hostname().isDefined()) {
+                                       // tell the launch coordinator that the 
task is being unassigned from the host, for planning purposes
+                                       launchCoordinator.tell(new 
LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), 
selfActor);
+                               }
+                       }
+                       else if (workersBeingReturned.containsKey(resourceID)) {
+                               LOG.info("Ignoring request to stop worker {} 
because it is already being stopped.", resourceID);
+                       }
+                       else {
+                               LOG.warn("Unrecognized worker {}.", resourceID);
+                       }
+               }
+               catch (Exception e) {
+                       onFatalErrorAsync(new ResourceManagerException("Unable 
to release a worker.", e));
+               }
        }
 
        /**
@@ -596,8 +621,6 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
                        assert(launched != null);
                        LOG.info("Worker {} failed with status: {}, reason: {}, 
message: {}.",
                                id, status.getState(), status.getReason(), 
status.getMessage());
-
-                       // TODO : launch a replacement worker?
                }
 
                closeTaskManagerConnection(id, new 
Exception(status.getMessage()));

http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala
index 4f49c16..54d1bd2 100644
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala
+++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala
@@ -34,6 +34,7 @@ import scala.collection.mutable.{Map => MutableMap}
   * Routes messages between the scheduler and individual task monitor actors.
   */
 class Tasks(
+     manager: ActorRef,
      flinkConfig: Configuration,
      schedulerDriver: SchedulerDriver,
      taskMonitorCreator: (ActorRefFactory,TaskGoalState) => ActorRef) extends 
Actor {
@@ -92,11 +93,11 @@ class Tasks(
       }
 
     case msg: Reconcile =>
-      context.parent.forward(msg)
+      manager.forward(msg)
 
     case msg: TaskTerminated =>
       taskMap.remove(msg.taskID)
-      context.parent.forward(msg)
+      manager.forward(msg)
   }
 
   private def createTask(task: TaskGoalState): ActorRef = {
@@ -113,6 +114,7 @@ object Tasks {
     */
   def createActorProps[T <: Tasks, M <: TaskMonitor](
       actorClass: Class[T],
+      manager: ActorRef,
       flinkConfig: Configuration,
       schedulerDriver: SchedulerDriver,
       taskMonitorClass: Class[M]): Props = {
@@ -122,6 +124,6 @@ object Tasks {
       factory.actorOf(props)
     }
 
-    Props.create(actorClass, flinkConfig, schedulerDriver, taskMonitorCreator)
+    Props.create(actorClass, manager, flinkConfig, schedulerDriver, 
taskMonitorCreator)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 4bbcb25..cf0c913 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -78,6 +78,7 @@ import org.apache.mesos.SchedulerDriver;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -330,9 +331,9 @@ public class MesosResourceManagerTest extends TestLogger {
                                
when(slotManager.registerSlotRequest(any(SlotRequest.class))).thenReturn(true);
                        }
 
-                       public void grantLeadership() {
+                       public void grantLeadership() throws Exception {
                                rmLeaderSessionId = UUID.randomUUID();
-                               
rmLeaderElectionService.isLeader(rmLeaderSessionId);
+                               
rmLeaderElectionService.isLeader(rmLeaderSessionId).get(timeout.toMilliseconds(),
 TimeUnit.MILLISECONDS);
                        }
                }
 
@@ -454,7 +455,7 @@ public class MesosResourceManagerTest extends TestLogger {
                        MesosWorkerStore.Worker expected = 
MesosWorkerStore.Worker.newWorker(taskID, resourceProfile);
 
                        // drain the probe messages
-                       verify(rmServices.workerStore).putWorker(expected);
+                       verify(rmServices.workerStore, 
Mockito.timeout(timeout.toMilliseconds())).putWorker(expected);
                        assertThat(resourceManager.workersInNew, 
hasEntry(extractResourceID(taskID), expected));
                        
resourceManager.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
                        
resourceManager.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class);
@@ -531,7 +532,7 @@ public class MesosResourceManagerTest extends TestLogger {
                        // verify that a new worker was persisted, the internal 
state was updated, the task router was notified,
                        // and the launch coordinator was asked to launch a task
                        MesosWorkerStore.Worker expected = 
MesosWorkerStore.Worker.newWorker(task1, resourceProfile1);
-                       verify(rmServices.workerStore).putWorker(expected);
+                       verify(rmServices.workerStore, 
Mockito.timeout(timeout.toMilliseconds())).putWorker(expected);
                        assertThat(resourceManager.workersInNew, 
hasEntry(extractResourceID(task1), expected));
                        
resourceManager.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
                        
resourceManager.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class);
@@ -617,7 +618,7 @@ public class MesosResourceManagerTest extends TestLogger {
                        // send registration message
                        CompletableFuture<RegistrationResponse> 
successfulFuture =
                                
resourceManager.registerTaskExecutor(rmServices.rmLeaderSessionId, 
task1Executor.address, task1Executor.resourceID, slotReport, timeout);
-                       RegistrationResponse response = successfulFuture.get(5, 
TimeUnit.SECONDS);
+                       RegistrationResponse response = 
successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                        assertTrue(response instanceof 
TaskExecutorRegistrationSuccess);
 
                        // verify the internal state
@@ -653,6 +654,36 @@ public class MesosResourceManagerTest extends TestLogger {
        }
 
        /**
+        * Test planned stop of a launched worker.
+        */
+       @Test
+       public void testStopWorker() throws Exception {
+               new Context() {{
+                       // set the initial persistent state with a launched 
worker
+                       MesosWorkerStore.Worker worker1launched = 
MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
+                       
when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
+                       
when(rmServices.workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
+                       startResourceManager();
+
+                       // drain the assign message
+                       
resourceManager.launchCoordinator.expectMsgClass(LaunchCoordinator.Assign.class);
+
+                       // tell the RM to stop the worker
+                       resourceManager.stopWorker(extractResourceID(task1));
+
+                       // verify that the instance state was updated
+                       MesosWorkerStore.Worker worker1Released = 
worker1launched.releaseWorker();
+                       
verify(rmServices.workerStore).putWorker(worker1Released);
+                       assertThat(resourceManager.workersInLaunch.entrySet(), 
empty());
+                       assertThat(resourceManager.workersBeingReturned, 
hasEntry(extractResourceID(task1), worker1Released));
+
+                       // verify that the monitor was notified
+                       
resourceManager.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
+                       
resourceManager.launchCoordinator.expectMsgClass(LaunchCoordinator.Unassign.class);
+               }};
+       }
+
+       /**
         * Test application shutdown handling.
         */
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TasksTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TasksTest.scala 
b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TasksTest.scala
index fcf2977..b3d9a5f 100644
--- 
a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TasksTest.scala
+++ 
b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TasksTest.scala
@@ -93,7 +93,7 @@ class TasksTest
         taskActorRef
       }
       TestActorRef[Tasks](
-        Props(classOf[Tasks], config, schedulerDriver, taskActorCreator),
+        Props(classOf[Tasks], testActor, config, schedulerDriver, 
taskActorCreator),
         testActor,
         TestFSMUtils.randomName)
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index e8ec0e0..a9a9e50 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -944,7 +944,12 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        @VisibleForTesting
        public abstract void startNewWorker(ResourceProfile resourceProfile);
 
-       public abstract void stopWorker(InstanceID instanceId);
+       /**
+        * Deallocates a resource.
+        *
+        * @param resourceID The resource ID
+        */
+       public abstract void stopWorker(ResourceID resourceID);
 
        /**
         * Callback when a worker was started.
@@ -960,12 +965,36 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
 
                @Override
                public void releaseResource(InstanceID instanceId) {
-                       stopWorker(instanceId);
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       ResourceID resourceID = null;
+
+                                       for (Map.Entry<ResourceID, 
WorkerRegistration<WorkerType>> entry : taskExecutors.entrySet()) {
+                                               if 
(entry.getValue().getInstanceID().equals(instanceId)) {
+                                                       resourceID = 
entry.getKey();
+                                                       break;
+                                               }
+                                       }
+
+                                       if (resourceID != null) {
+                                               stopWorker(resourceID);
+                                       }
+                                       else {
+                                               log.warn("Ignoring request to 
release TaskManager with instance ID {} (not found).", instanceId);
+                                       }
+                               }
+                       });
                }
 
                @Override
                public void allocateResource(ResourceProfile resourceProfile) 
throws ResourceManagerException {
-                       startNewWorker(resourceProfile);
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       startNewWorker(resourceProfile);
+                               }
+                       });
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index a921a29..ac2c745 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
@@ -76,7 +75,7 @@ public class StandaloneResourceManager extends 
ResourceManager<ResourceID> {
        }
 
        @Override
-       public void stopWorker(InstanceID instanceId) {
+       public void stopWorker(ResourceID resourceID) {
 
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 3bda409..5218286 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -836,10 +836,13 @@ public class SlotManager implements AutoCloseable {
 
                        while (taskManagerRegistrationIterator.hasNext()) {
                                TaskManagerRegistration taskManagerRegistration 
= taskManagerRegistrationIterator.next().getValue();
+                               LOG.debug("Evaluating TaskManager {} for 
idleness.", taskManagerRegistration.getInstanceId());
 
                                if 
(anySlotUsed(taskManagerRegistration.getSlots())) {
                                        taskManagerRegistration.markUsed();
                                } else if (currentTime - 
taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) {
+                                       LOG.info("Removing idle TaskManager {} 
from the SlotManager.", taskManagerRegistration.getInstanceId());
+
                                        
taskManagerRegistrationIterator.remove();
 
                                        
internalUnregisterTaskManager(taskManagerRegistration);

http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
index 7d3764c..f19f9bf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
@@ -68,7 +68,9 @@ public class TaskManagerRegistration {
        }
 
        public void markIdle() {
-               idleSince = System.currentTimeMillis();
+               if (!isIdle()) {
+                       idleSince = System.currentTimeMillis();
+               }
        }
 
        public void markUsed() {

http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 9ad251b..737cede 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -629,7 +629,7 @@ public class ResourceManagerTest extends TestLogger {
 
                        final ResourceManagerGateway rmGateway = 
resourceManager.getSelfGateway(ResourceManagerGateway.class);
 
-                       rmLeaderElectionService.isLeader(rmLeaderId);
+                       
rmLeaderElectionService.isLeader(rmLeaderId).get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
 
                        // test registration response successful and it will 
trigger monitor heartbeat target, schedule heartbeat request at interval time
                        CompletableFuture<RegistrationResponse> 
successfulFuture = rmGateway.registerJobManager(

http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
index d456083..d951db5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.leaderelection;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Test {@link LeaderElectionService} implementation which directly forwards 
isLeader and notLeader
@@ -28,43 +29,61 @@ public class TestingLeaderElectionService implements 
LeaderElectionService {
 
        private LeaderContender contender;
        private boolean hasLeadership = false;
+       private CompletableFuture<UUID> confirmationFuture = null;
+
+       /**
+        * Gets a future that completes when leadership is confirmed.
+        *
+        * <p>Note: the future is created upon calling {@link #isLeader(UUID)}.
+        */
+       public synchronized CompletableFuture<UUID> getConfirmationFuture() {
+               return confirmationFuture;
+       }
 
        @Override
-       public void start(LeaderContender contender) throws Exception {
+       public synchronized void start(LeaderContender contender) throws 
Exception {
                this.contender = contender;
        }
 
        @Override
-       public void stop() throws Exception {
+       public synchronized void stop() throws Exception {
 
        }
 
        @Override
-       public void confirmLeaderSessionID(UUID leaderSessionID) {
-
+       public synchronized void confirmLeaderSessionID(UUID leaderSessionID) {
+               if (confirmationFuture != null) {
+                       confirmationFuture.complete(leaderSessionID);
+               }
        }
 
        @Override
-       public boolean hasLeadership() {
+       public synchronized boolean hasLeadership() {
                return hasLeadership;
        }
 
-       public void isLeader(UUID leaderSessionID) {
+       public synchronized CompletableFuture<UUID> isLeader(UUID 
leaderSessionID) {
+               if (confirmationFuture != null) {
+                       confirmationFuture.cancel(false);
+               }
+               confirmationFuture = new CompletableFuture<>();
                hasLeadership = true;
                contender.grantLeadership(leaderSessionID);
+
+               return confirmationFuture;
        }
 
-       public void notLeader() {
+       public synchronized void notLeader() {
                hasLeadership = false;
                contender.revokeLeadership();
        }
 
-       public void reset() {
+       public synchronized void reset() {
                contender = null;
                hasLeadership  = false;
        }
 
-       public String getAddress() {
+       public synchronized String getAddress() {
                return contender.getAddress();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/40656c5d/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index fb1a1c3..c3398c4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
@@ -228,7 +227,7 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
        }
 
        @Override
-       public void stopWorker(InstanceID instanceId) {
+       public void stopWorker(ResourceID resourceID) {
                // TODO: Implement to stop the worker
        }
 

Reply via email to