[FLINK-4152] Allow re-registration of TMs at resource manager

- Add YarnFlinkResourceManager test to reaccept task manager registrations from 
a re-elected job manager

- Remove unnecessary sync logic between JobManager and ResourceManager

- Avoid duplicate reigstration attempts in case of a refused registration

- Add test case to check that not an excessive amount of RegisterTaskManager 
messages are sent

- Remove containersLaunched from YarnFlinkResourceManager and instead not 
clearing registeredWorkers when JobManager loses leadership

- Let YarnFlinkResourceManagerTest extend TestLogger

- Harden YarnFlinkResourceManager.getContainersFromPreviousAttempts

- Add FatalErrorOccurred message handler to FlinkResourceManager;
  Increase timeout for YarnFlinkResourceManagerTest;
  Add additional constructor to TestingYarnFlinkResourceManager for tests

- Rename registeredWorkers field into startedWorkers
Additionally, the RegisterResource message is renamed into 
NotifyResourceStarted which
tells the RM that a resource has been started. This reflects the current 
semantics of
the startedWorkers map in the resource manager.

- Fix concurrency issues in TestingLeaderRetrievalService

This closes #2257


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2648bc1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2648bc1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2648bc1a

Branch: refs/heads/master
Commit: 2648bc1a5a5faed8c2061bcab40a8949fd02751c
Parents: c6715b7
Author: Till Rohrmann <[email protected]>
Authored: Fri Jul 15 10:51:59 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Tue Jul 26 16:39:22 2016 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  34 ++-
 .../clusterframework/FlinkResourceManager.java  | 119 ++++----
 .../messages/NotifyResourceStarted.java         |  47 +++
 .../messages/RegisterResource.java              |  55 ----
 .../messages/RegisterResourceFailed.java        |  68 -----
 .../messages/RegisterResourceSuccessful.java    |  58 ----
 .../standalone/StandaloneResourceManager.java   |   4 +-
 .../testutils/TestingResourceManager.java       |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  51 +---
 .../runtime/messages/RegistrationMessages.scala |   8 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 202 +++++++++----
 .../taskmanager/TaskManagerConfiguration.scala  |  27 +-
 .../testingUtils/TestingJobManagerLike.scala    |   5 +-
 .../TestingLeaderRetrievalService.java          |   9 +-
 .../resourcemanager/ResourceManagerTest.java    |  66 +---
 .../TaskManagerRegistrationTest.java            | 107 ++++++-
 flink-yarn-tests/pom.xml                        |   8 +
 .../yarn/TestingYarnClusterDescriptor.java      |   5 +
 .../yarn/TestingYarnFlinkResourceManager.java   |  56 ----
 flink-yarn/pom.xml                              |  31 +-
 .../flink/yarn/RegisteredYarnWorkerNode.java    |   1 -
 .../flink/yarn/YarnFlinkResourceManager.java    | 113 +++++--
 .../YarnResourceManagerCallbackHandler.java     |  33 +-
 .../yarn/TestingYarnFlinkResourceManager.java   | 111 +++++++
 .../yarn/YarnFlinkResourceManagerTest.java      | 298 +++++++++++++++++++
 .../messages/NotifyWhenResourcesRegistered.java |  32 ++
 .../RequestNumberOfRegisteredResources.java     |  25 ++
 .../src/test/resources/log4j-test.properties    |  27 ++
 flink-yarn/src/test/resources/logback-test.xml  |  34 +++
 29 files changed, 1127 insertions(+), 509 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index e40bed3..028732a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -243,11 +243,28 @@ public final class ConfigConstants {
        public static final String 
TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS = 
"taskmanager.debug.memory.logIntervalMs";
 
        /**
-        *
+        * Defines the maximum time it can take for the TaskManager 
registration. If the duration is
+        * exceeded without a successful registration, then the TaskManager 
terminates.
         */
        public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = 
"taskmanager.maxRegistrationDuration";
 
        /**
+        * The initial registration pause between two consecutive registration 
attempts. The pause
+        * is doubled for each new registration attempt until it reaches the 
maximum registration pause.
+        */
+       public static final String TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = 
"taskmanager.initial-registration-pause";
+
+       /**
+        * The maximum registration pause between two consecutive registration 
attempts.
+        */
+       public static final String TASK_MANAGER_MAX_REGISTARTION_PAUSE = 
"taskmanager.max-registration-pause";
+
+       /**
+        * The pause after a registration has been refused by the job manager 
before retrying to connect.
+        */
+       public static final String TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = 
"taskmanager.refused-registration-pause";
+
+       /**
         * Time interval between two successive task cancellation attempts in 
milliseconds.
         */
        @PublicEvolving
@@ -788,6 +805,21 @@ public final class ConfigConstants {
        public static final String 
DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf";
 
        /**
+        * The default task manager's initial registration pause.
+        */
+       public static final String 
DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = "500 ms";
+
+       /**
+        * The default task manager's maximum registration pause.
+        */
+       public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE 
= "30 s";
+
+       /**
+        * The default task manager's refused registration pause.
+        */
+       public static final String 
DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "10 s";
+
+       /**
         * The default setting for TaskManager memory eager allocation of 
managed memory
         */
        public static final boolean DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE = 
false;

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index d28d4aa..95be084 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -34,10 +34,8 @@ import 
org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContai
 import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import 
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResource;
-import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed;
+import 
org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
 import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful;
 import org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable;
 import 
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener;
 import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
@@ -51,10 +49,9 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
 
-import org.apache.flink.runtime.messages.RegistrationMessages;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import scala.concurrent.Future;
@@ -89,7 +86,7 @@ import static java.util.Objects.requireNonNull;
  *     <li>At some point, the TaskManager processes will have started and send 
a registration
  *         message to the JobManager. The JobManager will perform
  *         a lookup with the ResourceManager to check if it really started 
this TaskManager.
- *         The method {@link #workerRegistered(ResourceID)} will be called
+ *         The method {@link #workerStarted(ResourceID)} will be called
  *         to inform about a registered worker.</li>
  * </ol>
  *
@@ -113,8 +110,9 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceIDRetrieva
        /** The service to find the right leader JobManager (to support high 
availability) */
        private final LeaderRetrievalService leaderRetriever;
 
-       /** The currently registered resources */
-       private final Map<ResourceID, WorkerType> registeredWorkers;
+       /** Map which contains the workers from which we know that they have 
been successfully started
+        * in a container. This notification is sent by the JM when a TM tries 
to register at it. */
+       private final Map<ResourceID, WorkerType> startedWorkers;
 
        /** List of listeners for info messages */
        private final Set<ActorRef> infoMessageListeners;
@@ -141,7 +139,7 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceIDRetrieva
                        LeaderRetrievalService leaderRetriever) {
                this.config = requireNonNull(flinkConfig);
                this.leaderRetriever = requireNonNull(leaderRetriever);
-               this.registeredWorkers = new HashMap<>();
+               this.startedWorkers = new HashMap<>();
 
                FiniteDuration lt;
                try {
@@ -230,9 +228,9 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceIDRetrieva
 
                        // --- lookup of registered resources
 
-                       else if (message instanceof RegisterResource) {
-                               RegisterResource msg = (RegisterResource) 
message;
-                               handleRegisterResource(sender(), 
msg.getTaskManager(), msg.getRegisterMessage());
+                       else if (message instanceof NotifyResourceStarted) {
+                               NotifyResourceStarted msg = 
(NotifyResourceStarted) message;
+                               handleResourceStarted(sender(), 
msg.getResourceID());
                        }
 
                        // --- messages about JobManager leader status and 
registration
@@ -273,6 +271,11 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceIDRetrieva
                                infoMessageListeners.remove(sender());
                        }
 
+                       else if (message instanceof FatalErrorOccurred) {
+                               FatalErrorOccurred fatalErrorOccurred = 
(FatalErrorOccurred) message;
+                               fatalError(fatalErrorOccurred.message(), 
fatalErrorOccurred.error());
+                       }
+
                        // --- unknown messages
 
                        else {
@@ -307,73 +310,68 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceIDRetrieva
        }
 
        /**
-        * Gets the number of currently registered TaskManagers.
+        * Gets the number of currently started TaskManagers.
         *
-        * @return The number of currently registered TaskManagers.
+        * @return The number of currently started TaskManagers.
         */
-       public int getNumberOfRegisteredTaskManagers() {
-               return registeredWorkers.size();
+       public int getNumberOfStartedTaskManagers() {
+               return startedWorkers.size();
        }
 
        /**
         * Gets the currently registered resources.
         * @return
         */
-       public Collection<WorkerType> getRegisteredTaskManagers() {
-               return registeredWorkers.values();
+       public Collection<WorkerType> getStartedTaskManagers() {
+               return startedWorkers.values();
        }
 
        /**
-        * Gets the registered worker for a given resource ID, if one is 
available.
+        * Gets the started worker for a given resource ID, if one is available.
         *
         * @param resourceId The resource ID for the worker.
         * @return True if already registered, otherwise false
         */
-       public boolean isRegistered(ResourceID resourceId) {
-               return registeredWorkers.containsKey(resourceId);
+       public boolean isStarted(ResourceID resourceId) {
+               return startedWorkers.containsKey(resourceId);
        }
 
        /**
-        * Gets an iterable for all currently registered TaskManagers.
+        * Gets an iterable for all currently started TaskManagers.
         *
-        * @return All currently registered TaskManagers.
+        * @return All currently started TaskManagers.
         */
-       public Collection<WorkerType> allRegisteredWorkers() {
-               return registeredWorkers.values();
+       public Collection<WorkerType> allStartedWorkers() {
+               return startedWorkers.values();
        }
 
        /**
-        * Register a resource on which a TaskManager has been started
+        * Tells the ResourceManager that a TaskManager had been started in a 
container with the given
+        * resource id.
+        *
         * @param jobManager The sender (JobManager) of the message
-        * @param taskManager The task manager who wants to register
-        * @param msg The task manager's registration message
+        * @param resourceID The resource id of the started TaskManager
         */
-       private void handleRegisterResource(ActorRef jobManager, ActorRef 
taskManager,
-                               RegistrationMessages.RegisterTaskManager msg) {
-
-               ResourceID resourceID = msg.resourceId();
-               try {
-                       Preconditions.checkNotNull(resourceID);
+       private void handleResourceStarted(ActorRef jobManager, ResourceID 
resourceID) {
+               if (resourceID != null) {
                        // check if resourceID is already registered 
(TaskManager may send duplicate register messages)
-                       WorkerType oldWorker = 
registeredWorkers.get(resourceID);
+                       WorkerType oldWorker = startedWorkers.get(resourceID);
                        if (oldWorker != null) {
-                               LOG.debug("TaskManager {} had been registered 
before.", resourceID);
+                               LOG.debug("Notification that TaskManager {} had 
been started was sent before.", resourceID);
                        } else {
-                               WorkerType newWorker = 
workerRegistered(resourceID);
-                               registeredWorkers.put(resourceID, newWorker);
-                               LOG.info("TaskManager {} has registered.", 
resourceID);
-                       }
-                       jobManager.tell(decorateMessage(
-                               new RegisterResourceSuccessful(taskManager, 
msg)),
-                               self());
-               } catch (Exception e) {
-                       LOG.warn("TaskManager resource registration failed for 
{}", resourceID, e);
+                               WorkerType newWorker = 
workerStarted(resourceID);
 
-                       // tell the JobManager about the failure
-                       String eStr = ExceptionUtils.stringifyException(e);
-                       sender().tell(decorateMessage(
-                               new RegisterResourceFailed(taskManager, 
resourceID, eStr)), self());
+                               if (newWorker != null) {
+                                       startedWorkers.put(resourceID, 
newWorker);
+                                       LOG.info("TaskManager {} has started.", 
resourceID);
+                               } else {
+                                       LOG.info("TaskManager {} has not been 
started by this resource manager.", resourceID);
+                               }
+                       }
                }
+
+               // Acknowledge the resource registration
+               jobManager.tell(decorateMessage(Acknowledge.get()), self());
        }
 
        /**
@@ -384,9 +382,9 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceIDRetrieva
         */
        private void removeRegisteredResource(ResourceID resourceId) {
 
-               WorkerType worker = registeredWorkers.remove(resourceId);
+               WorkerType worker = startedWorkers.remove(resourceId);
                if (worker != null) {
-                       releaseRegisteredWorker(worker);
+                       releaseStartedWorker(worker);
                } else {
                        LOG.warn("Resource {} could not be released", 
resourceId);
                }
@@ -463,8 +461,7 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceIDRetrieva
        }
 
        /**
-        * This method disassociates from the current leader JobManager. All 
currently registered
-        * TaskManagers are put under "awaiting registration".
+        * This method disassociates from the current leader JobManager.
         */
        private void jobManagerLostLeadership() {
                if (jobManager != null) {
@@ -474,8 +471,6 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceIDRetrieva
                        leaderSessionID = null;
 
                        infoMessageListeners.clear();
-
-                       registeredWorkers.clear();
                }
        }
 
@@ -510,7 +505,7 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceIDRetrieva
                                        // put the consolidated TaskManagers 
into our bookkeeping
                                        for (WorkerType worker : consolidated) {
                                                ResourceID resourceID = 
worker.getResourceID();
-                                               
registeredWorkers.put(resourceID, worker);
+                                               startedWorkers.put(resourceID, 
worker);
                                                toHandle.remove(resourceID);
                                        }
                                }
@@ -568,7 +563,7 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceIDRetrieva
                        "Number of pending workers pending registration should 
never be below 0.");
 
                // see how many workers we want, and whether we have enough
-               int allAvailableAndPending = registeredWorkers.size() +
+               int allAvailableAndPending = startedWorkers.size() +
                        numWorkersPending + numWorkersPendingRegistration;
 
                int missing = designatedPoolSize - allAvailableAndPending;
@@ -619,7 +614,7 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceIDRetrieva
         * @param message An informational message that explains why the worker 
failed.
         */
        public void notifyWorkerFailed(ResourceID resourceID, String message) {
-               WorkerType worker = registeredWorkers.remove(resourceID);
+               WorkerType worker = startedWorkers.remove(resourceID);
                if (worker != null) {
                        jobManager.tell(
                                decorateMessage(
@@ -676,16 +671,16 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceIDRetrieva
        protected abstract void releasePendingWorker(ResourceID resourceID);
 
        /**
-        * Trigger a release of a registered worker.
+        * Trigger a release of a started worker.
         * @param resourceID The worker resource id
         */
-       protected abstract void releaseRegisteredWorker(WorkerType resourceID);
+       protected abstract void releaseStartedWorker(WorkerType resourceID);
 
        /**
-        * Callback when a worker was registered.
+        * Callback when a worker was started.
         * @param resourceID The worker resource id
         */
-       protected abstract WorkerType workerRegistered(ResourceID resourceID) 
throws Exception;
+       protected abstract WorkerType workerStarted(ResourceID resourceID);
 
        /**
         * This method is called when the resource manager starts after a 
failure and reconnects to

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NotifyResourceStarted.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NotifyResourceStarted.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NotifyResourceStarted.java
new file mode 100644
index 0000000..1427ba8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NotifyResourceStarted.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.messages;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+/**
+ * Notifies the ResourceManager that a TaskManager has been started in a 
container with the given
+ * resource id.
+ */
+public class NotifyResourceStarted implements RequiresLeaderSessionID, 
java.io.Serializable {
+       private static final long serialVersionUID = 1L;
+
+       private final ResourceID resourceID;
+
+       public NotifyResourceStarted(ResourceID resourceID) {
+               this.resourceID = resourceID;
+       }
+
+       public ResourceID getResourceID() {
+               return resourceID;
+       }
+
+       @Override
+       public String toString() {
+               return "NotifyResourceStarted{" +
+                       ", resourceID=" + resourceID +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResource.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResource.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResource.java
deleted file mode 100644
index bad51f0..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResource.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.messages.RegistrationMessages;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-/**
- * Triggers a lookup at the ResourceManager to check if the resource for a 
TaskManager is registered.
- */
-public class RegisterResource implements RequiresLeaderSessionID, 
java.io.Serializable {
-       private static final long serialVersionUID = 1L;
-
-       private final ActorRef taskManager;
-       private final RegistrationMessages.RegisterTaskManager registerMessage;
-
-
-       public RegisterResource(ActorRef taskManager, 
RegistrationMessages.RegisterTaskManager registerMessage) {
-               this.taskManager = taskManager;
-               this.registerMessage = registerMessage;
-       }
-
-       public ActorRef getTaskManager() {
-               return taskManager;
-       }
-
-       public RegistrationMessages.RegisterTaskManager getRegisterMessage() {
-               return registerMessage;
-       }
-
-       @Override
-       public String toString() {
-               return "RegisterResource{" +
-                       "taskManager=" + taskManager +
-                       ", registerMessage=" + registerMessage +
-                       '}';
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceFailed.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceFailed.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceFailed.java
deleted file mode 100644
index a19c0ab..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceFailed.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-/**
- * Answer to RegisterResource to indicate that the requested resource is 
unknown.
- * Sent by the ResourceManager to the JobManager.
- */
-public class RegisterResourceFailed implements RequiresLeaderSessionID, 
java.io.Serializable {
-       private static final long serialVersionUID = 1L;
-
-       /** Task Manager which tried to register */
-       private final ActorRef taskManager;
-
-       /** The id of the task manager resource */
-       private final ResourceID resourceID;
-
-       /** Error message */
-       private final String message;
-
-       public RegisterResourceFailed(ActorRef taskManager, ResourceID 
resourceId, String message) {
-               this.taskManager = taskManager;
-               this.resourceID = resourceId;
-               this.message = message;
-       }
-
-
-       public String getMessage() {
-               return message;
-       }
-
-       public ActorRef getTaskManager() {
-               return taskManager;
-       }
-
-       public ResourceID getResourceID() {
-               return resourceID;
-       }
-
-       @Override
-       public String toString() {
-               return "RegisterResourceFailed{" +
-                       "taskManager=" + taskManager +
-                       ", resourceID=" + resourceID +
-                       ", message='" + message + '\'' +
-                       '}';
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceSuccessful.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceSuccessful.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceSuccessful.java
deleted file mode 100644
index c29d28d..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceSuccessful.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.messages.RegistrationMessages;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-
-/**
- * Answer to RegisterResource to indicate that the requested resource is known.
- * Sent by the ResourceManager to the JobManager.
- */
-public class RegisterResourceSuccessful implements RequiresLeaderSessionID, 
java.io.Serializable {
-       private static final long serialVersionUID = 1L;
-
-       private final ActorRef taskManager;
-       private final RegistrationMessages.RegisterTaskManager 
registrationMessage;
-
-       public RegisterResourceSuccessful(ActorRef taskManager,
-                       RegistrationMessages.RegisterTaskManager 
registrationMessage) {
-               this.taskManager = taskManager;
-               this.registrationMessage = registrationMessage;
-       }
-
-
-       public ActorRef getTaskManager() {
-               return taskManager;
-       }
-
-       public RegistrationMessages.RegisterTaskManager 
getRegistrationMessage() {
-               return registrationMessage;
-       }
-
-       @Override
-       public String toString() {
-               return "RegisterResourceSuccessful{" +
-                       "taskManager=" + taskManager +
-                       ", registrationMessage=" + registrationMessage +
-                       '}';
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
index 4626461..89a602e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
@@ -83,13 +83,13 @@ public class StandaloneResourceManager extends 
FlinkResourceManager<ResourceID>
        }
 
        @Override
-       protected ResourceID workerRegistered(ResourceID resourceID) {
+       protected ResourceID workerStarted(ResourceID resourceID) {
                // we accept everything
                return resourceID;
        }
 
        @Override
-       protected void releaseRegisteredWorker(ResourceID resourceID) {
+       protected void releaseStartedWorker(ResourceID resourceID) {
                // cannot release any workers, they simply stay
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
index 2422925..495cacd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
@@ -58,7 +58,7 @@ public class TestingResourceManager extends 
StandaloneResourceManager {
        protected void handleMessage(Object message) {
 
                if (message instanceof GetRegisteredResources) {
-                       sender().tell(new 
GetRegisteredResourcesReply(getRegisteredTaskManagers()), self());
+                       sender().tell(new 
GetRegisteredResourcesReply(getStartedTaskManagers()), self());
                } else if (message instanceof FailResource) {
                        ResourceID resourceID = ((FailResource) 
message).resourceID;
                        notifyWorkerFailed(resourceID, "Failed for test case.");

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0026bef..f14a37f 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -400,36 +400,23 @@ class JobManager(
 
       currentResourceManager match {
         case Some(rm) =>
-          val future = (rm ? decorateMessage(new RegisterResource(taskManager, 
msg)))(timeout)
-          future.onComplete {
-            case scala.util.Success(response) =>
-              // the resource manager is available and answered
-              self ! response
-            case scala.util.Failure(t) =>
+          val future = (rm ? decorateMessage(new 
NotifyResourceStarted(msg.resourceId)))(timeout)
+          future.onFailure {
+            case t: Throwable =>
               t match {
                 case _: TimeoutException =>
                   log.info("Attempt to register resource at ResourceManager 
timed out. Retrying")
                 case _ =>
                   log.warn("Failure while asking ResourceManager for 
RegisterResource. Retrying", t)
               }
-              // slow or unreachable resource manager, register anyway and let 
the rm reconnect
-              self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
               self ! decorateMessage(new ReconnectResourceManager(rm))
           }(context.dispatcher)
 
         case None =>
           log.info("Task Manager Registration but not connected to 
ResourceManager")
-          // ResourceManager not yet available
-          // sending task manager information later upon ResourceManager 
registration
-          self ! decorateMessage(new RegisterResourceSuccessful(taskManager, 
msg))
       }
 
-    case msg: RegisterResourceSuccessful =>
-
-      val originalMsg = msg.getRegistrationMessage
-      val taskManager = msg.getTaskManager
-
-      // ResourceManager knows about the resource, now let's try to register 
TaskManager
+      // ResourceManager is told about the resource, now let's try to register 
TaskManager
       if (instanceManager.isRegistered(taskManager)) {
         val instanceID = 
instanceManager.getRegisteredInstance(taskManager).getId
 
@@ -441,10 +428,10 @@ class JobManager(
         try {
           val instanceID = instanceManager.registerTaskManager(
             taskManager,
-            originalMsg.resourceId,
-            originalMsg.connectionInfo,
-            originalMsg.resources,
-            originalMsg.numberOfSlots,
+            resourceId,
+            connectionInfo,
+            hardwareInformation,
+            numberOfSlots,
             leaderSessionID.orNull)
 
           taskManager ! decorateMessage(
@@ -463,24 +450,18 @@ class JobManager(
         }
       }
 
-    case msg: RegisterResourceFailed =>
-
-      val taskManager = msg.getTaskManager
-      val resourceId = msg.getResourceID
-      log.warn(s"TaskManager's resource id $resourceId failed to register at 
ResourceManager. " +
-        s"Refusing registration because of\n${msg.getMessage}.")
-
-      taskManager ! decorateMessage(
-        RefuseRegistration(new IllegalStateException(
-            s"Resource $resourceId not registered with resource manager.")))
-
     case msg: ResourceRemoved =>
       // we're being informed by the resource manager that a resource has 
become unavailable
       val resourceID = msg.resourceId()
       log.debug(s"Resource has been removed: $resourceID")
-      val instance = instanceManager.getRegisteredInstance(resourceID)
-      // trigger removal of task manager
-      handleTaskManagerTerminated(instance.getActorGateway.actor())
+
+      Option(instanceManager.getRegisteredInstance(resourceID)) match {
+        case Some(instance) =>
+          // trigger removal of task manager
+          handleTaskManagerTerminated(instance.getActorGateway.actor())
+        case None =>
+          log.debug(s"Resource $resourceID has not been registered at job 
manager.")
+      }
 
     case RequestNumberRegisteredTaskManager =>
       sender ! 
decorateMessage(instanceManager.getNumberOfRegisteredTaskManagers)

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
index b48bcf9..d362164 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.messages
 
+import java.util.UUID
+
 import akka.actor.ActorRef
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.{InstanceConnectionInfo, InstanceID, 
HardwareDescription}
+import org.apache.flink.runtime.instance.{HardwareDescription, 
InstanceConnectionInfo, InstanceID}
 
 import scala.concurrent.duration.{Deadline, FiniteDuration}
 
@@ -42,12 +44,14 @@ object RegistrationMessages {
    * @param timeout The timeout for the message. The next retry will double 
this timeout.
    * @param deadline Optional deadline until when the registration must be 
completed.
    * @param attempt The attempt number, for logging.
+   * @param registrationRun UUID of the current registration run to filter out 
outdated runs
    */
   case class TriggerTaskManagerRegistration(
       jobManagerURL: String,
       timeout: FiniteDuration,
       deadline: Option[Deadline],
-      attempt: Int)
+      attempt: Int,
+      registrationRun: UUID)
     extends RegistrationMessage
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index dcf1e38..a7dd789 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -189,6 +189,10 @@ class TaskManager(
        connectionInfo.getHostname(),
        new UnmodifiableConfiguration(config.configuration),
        config.tmpDirPaths)
+
+  private var scheduledTaskManagerRegistration: Option[Cancellable] = None
+  private var currentRegistrationRun: UUID = UUID.randomUUID()
+
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle
   // --------------------------------------------------------------------------
@@ -563,54 +567,60 @@ class TaskManager(
         jobManagerURL,
         timeout,
         deadline,
-        attempt) =>
+        attempt,
+        registrationRun) =>
+
+        if (registrationRun.equals(this.currentRegistrationRun)) {
+          if (isConnected) {
+            // this may be the case, if we queue another attempt and
+            // in the meantime, the registration is acknowledged
+            log.debug(
+              "TaskManager was triggered to register at JobManager, but is 
already registered")
+          } else if (deadline.exists(_.isOverdue())) {
+            // we failed to register in time. that means we should quit
+            log.error("Failed to register at the JobManager withing the 
defined maximum " +
+                        "connect time. Shutting down ...")
+
+            // terminate ourselves (hasta la vista)
+            self ! decorateMessage(PoisonPill)
+          } else {
+            if (!jobManagerAkkaURL.equals(Option(jobManagerURL))) {
+              throw new Exception("Invalid internal state: Trying to register 
at JobManager " +
+                                    s"$jobManagerURL even though the current 
JobManagerAkkaURL " +
+                                    s"is set to 
${jobManagerAkkaURL.getOrElse("")}")
+            }
 
-        if (isConnected) {
-          // this may be the case, if we queue another attempt and
-          // in the meantime, the registration is acknowledged
-          log.debug(
-            "TaskManager was triggered to register at JobManager, but is 
already registered")
-        } else if (deadline.exists(_.isOverdue())) {
-          // we failed to register in time. that means we should quit
-          log.error("Failed to register at the JobManager withing the defined 
maximum " +
-            "connect time. Shutting down ...")
-
-          // terminate ourselves (hasta la vista)
-          self ! decorateMessage(PoisonPill)
-        } else {
-          if (!jobManagerAkkaURL.equals(Option(jobManagerURL))) {
-            throw new Exception("Invalid internal state: Trying to register at 
JobManager " +
-              s"$jobManagerURL even though the current JobManagerAkkaURL is 
set to " +
-              s"${jobManagerAkkaURL.getOrElse("")}")
-          }
+            log.info(s"Trying to register at JobManager $jobManagerURL " +
+                       s"(attempt $attempt, timeout: $timeout)")
+
+            val jobManager = context.actorSelection(jobManagerURL)
 
-          log.info(s"Trying to register at JobManager $jobManagerURL " +
-            s"(attempt $attempt, timeout: $timeout)")
-
-          val jobManager = context.actorSelection(jobManagerURL)
-
-          jobManager ! decorateMessage(
-            RegisterTaskManager(
-              resourceID,
-              connectionInfo,
-              resources,
-              numberOfSlots)
-          )
-
-          // the next timeout computes via exponential backoff with cap
-          val nextTimeout = (timeout * 
2).min(TaskManager.MAX_REGISTRATION_TIMEOUT)
-
-          // schedule (with our timeout s delay) a check triggers a new 
registration
-          // attempt, if we are not registered by then
-          context.system.scheduler.scheduleOnce(
-            timeout,
-            self,
-            decorateMessage(TriggerTaskManagerRegistration(
-              jobManagerURL,
-              nextTimeout,
-              deadline,
-              attempt + 1)
-            ))(context.dispatcher)
+            jobManager ! decorateMessage(
+              RegisterTaskManager(
+                resourceID,
+                connectionInfo,
+                resources,
+                numberOfSlots)
+            )
+
+            // the next timeout computes via exponential backoff with cap
+            val nextTimeout = (timeout * 2).min(config.maxRegistrationPause)
+
+            // schedule (with our timeout s delay) a check triggers a new 
registration
+            // attempt, if we are not registered by then
+            scheduledTaskManagerRegistration = 
Option(context.system.scheduler.scheduleOnce(
+              timeout,
+              self,
+              decorateMessage(TriggerTaskManagerRegistration(
+                jobManagerURL,
+                nextTimeout,
+                deadline,
+                attempt + 1,
+                registrationRun)
+              ))(context.dispatcher))
+          }
+        } else {
+          log.info(s"Discarding registration run with ID $registrationRun")
         }
 
       // successful registration. associate with the JobManager
@@ -668,20 +678,27 @@ class TaskManager(
 
           if(jobManagerAkkaURL.isDefined) {
             // try the registration again after some time
-            val delay: FiniteDuration = 
TaskManager.DELAY_AFTER_REFUSED_REGISTRATION
+            val delay: FiniteDuration = config.refusedRegistrationPause
             val deadline: Option[Deadline] = 
config.maxRegistrationDuration.map {
               timeout => timeout + delay fromNow
             }
 
-            context.system.scheduler.scheduleOnce(delay) {
-              self ! decorateMessage(
-                TriggerTaskManagerRegistration(
-                  jobManagerAkkaURL.get,
-                  TaskManager.INITIAL_REGISTRATION_TIMEOUT,
-                  deadline,
-                  1)
-              )
-            }(context.dispatcher)
+            // start a new registration run
+            currentRegistrationRun = UUID.randomUUID()
+
+            scheduledTaskManagerRegistration.foreach(_.cancel())
+
+            scheduledTaskManagerRegistration = Option(
+              context.system.scheduler.scheduleOnce(delay) {
+                self ! decorateMessage(
+                  TriggerTaskManagerRegistration(
+                    jobManagerAkkaURL.get,
+                    config.initialRegistrationPause,
+                    deadline,
+                    1,
+                    currentRegistrationRun)
+                )
+              }(context.dispatcher))
           }
         } else {
           // ignore RefuseRegistration messages which arrived after 
AcknowledgeRegistration
@@ -1359,12 +1376,18 @@ class TaskManager(
       // begin attempts to reconnect
       val deadline: Option[Deadline] = 
config.maxRegistrationDuration.map(_.fromNow)
 
+      // start a new registration run
+      currentRegistrationRun = UUID.randomUUID()
+
+      scheduledTaskManagerRegistration.foreach(_.cancel())
+
       self ! decorateMessage(
         TriggerTaskManagerRegistration(
           jobManagerAkkaURL.get,
-          TaskManager.INITIAL_REGISTRATION_TIMEOUT,
+          config.initialRegistrationPause,
           deadline,
-          1)
+          1,
+          currentRegistrationRun)
       )
     }
   }
@@ -1412,13 +1435,6 @@ object TaskManager {
     * connection attempts */
   val STARTUP_CONNECT_LOG_SUPPRESS = 10000L
 
-  /** The initial time for registration of the TaskManager with the JobManager 
*/
-  val INITIAL_REGISTRATION_TIMEOUT: FiniteDuration = 500 milliseconds
-  /** The maximum time for registration of the TaskManager with the JobManager 
*/
-  val MAX_REGISTRATION_TIMEOUT: FiniteDuration = 30 seconds
-
-  val DELAY_AFTER_REFUSED_REGISTRATION: FiniteDuration = 10 seconds
-
   val HEARTBEAT_INTERVAL: FiniteDuration = 5000 milliseconds
 
 
@@ -2116,13 +2132,67 @@ object TaskManager {
         e)
     }
 
+    val initialRegistrationPause = try {
+      val pause = Duration(configuration.getString(
+        ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
+        ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE
+      ))
+
+      if (pause.isFinite()) {
+        pause.asInstanceOf[FiniteDuration]
+      } else {
+        throw new IllegalArgumentException(s"The initial registration pause 
must be finite: $pause")
+      }
+    } catch {
+      case e: NumberFormatException => throw new IllegalArgumentException(
+        "Invalid format for parameter " + 
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
+        e)
+    }
+
+    val maxRegistrationPause = try {
+      val pause = Duration(configuration.getString(
+        ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
+        ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE
+      ))
+
+      if (pause.isFinite()) {
+        pause.asInstanceOf[FiniteDuration]
+      } else {
+        throw new IllegalArgumentException(s"The maximum registration pause 
must be finite: $pause")
+      }
+    } catch {
+      case e: NumberFormatException => throw new IllegalArgumentException(
+        "Invalid format for parameter " + 
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
+        e)
+    }
+
+    val refusedRegistrationPause = try {
+      val pause = Duration(configuration.getString(
+        ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
+        ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE
+      ))
+
+      if (pause.isFinite()) {
+        pause.asInstanceOf[FiniteDuration]
+      } else {
+        throw new IllegalArgumentException(s"The refused registration pause 
must be finite: $pause")
+      }
+    } catch {
+      case e: NumberFormatException => throw new IllegalArgumentException(
+        "Invalid format for parameter " + 
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
+        e)
+    }
+
     val taskManagerConfig = TaskManagerConfiguration(
       tmpDirs,
       cleanupInterval,
       timeout,
       finiteRegistrationDuration,
       slots,
-      configuration)
+      configuration,
+      initialRegistrationPause,
+      maxRegistrationPause,
+      refusedRegistrationPause)
 
     (taskManagerConfig, networkConfig, connectionInfo, memType)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
index 03e8e63..aab3c5f 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.taskmanager
 
+import java.util.concurrent.TimeUnit
+
 import org.apache.flink.configuration.Configuration
 
 import scala.concurrent.duration.FiniteDuration
@@ -28,4 +30,27 @@ case class TaskManagerConfiguration(
     timeout: FiniteDuration,
     maxRegistrationDuration: Option[FiniteDuration],
     numberOfSlots: Int,
-    configuration: Configuration)
+    configuration: Configuration,
+    initialRegistrationPause: FiniteDuration,
+    maxRegistrationPause: FiniteDuration,
+    refusedRegistrationPause: FiniteDuration) {
+
+  def this(
+      tmpDirPaths: Array[String],
+      cleanupInterval: Long,
+      timeout: FiniteDuration,
+      maxRegistrationDuration: Option[FiniteDuration],
+      numberOfSlots: Int,
+      configuration: Configuration) {
+    this (
+      tmpDirPaths,
+      cleanupInterval,
+      timeout,
+      maxRegistrationDuration,
+      numberOfSlots,
+      configuration,
+      FiniteDuration(500, TimeUnit.MILLISECONDS),
+      FiniteDuration(30, TimeUnit.SECONDS),
+      FiniteDuration(10, TimeUnit.SECONDS))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 2fe830f..9640fcd 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -22,7 +22,6 @@ import akka.actor.{ActorRef, Cancellable, Terminated}
 import akka.pattern.{ask, pipe}
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.FlinkActor
-import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.JobManager
@@ -339,10 +338,10 @@ trait TestingJobManagerLike extends FlinkActor {
       }
 
     // TaskManager may be registered on these two messages
-    case msg @ (_: RegisterTaskManager | _: RegisterResourceSuccessful) =>
+    case msg @ (_: RegisterTaskManager) =>
       super.handleMessage(msg)
 
-      // dequeue all senders which wait for 
instanceManager.getNumberOfRegisteredTaskManagers or
+      // dequeue all senders which wait for 
instanceManager.getNumberOfStartedTaskManagers or
       // fewer registered TaskManagers
       while (waitForNumRegisteredTaskManagers.nonEmpty &&
         waitForNumRegisteredTaskManagers.head._1 <=

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
index c5eb155..d6bcaaf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
@@ -30,10 +30,10 @@ import java.util.UUID;
  */
 public class TestingLeaderRetrievalService implements LeaderRetrievalService {
 
-       private final String leaderAddress;
-       private final UUID leaderSessionID;
+       private volatile String leaderAddress;
+       private volatile UUID leaderSessionID;
 
-       private LeaderRetrievalListener listener;
+       private volatile LeaderRetrievalListener listener;
 
        public TestingLeaderRetrievalService() {
                this(null, null);
@@ -59,6 +59,9 @@ public class TestingLeaderRetrievalService implements 
LeaderRetrievalService {
        }
 
        public void notifyListener(String address, UUID leaderSessionID) {
+               this.leaderAddress = address;
+               this.leaderSessionID = leaderSessionID;
+
                if (listener != null) {
                        listener.notifyLeaderAddress(address, leaderSessionID);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index d75341f..043c81c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -18,32 +18,26 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResource;
-import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed;
+import 
org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
 import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
-import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful;
 import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
 import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
 import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
 import 
org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.RegistrationMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.TestingResourceManager;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.Mockito;
 import scala.Option;
 
 import java.util.ArrayList;
@@ -198,15 +192,10 @@ public class ResourceManagerTest {
                        ResourceID resourceID = ResourceID.generate();
 
                        // Send task manager registration
-                       resourceManager.tell(new RegisterResource(
-                               ActorRef.noSender(),
-                               new 
RegistrationMessages.RegisterTaskManager(resourceID,
-                                       
Mockito.mock(InstanceConnectionInfo.class),
-                                       null,
-                                       1)),
+                       resourceManager.tell(new 
NotifyResourceStarted(resourceID),
                                fakeJobManager);
 
-                       expectMsgClass(RegisterResourceSuccessful.class);
+                       expectMsgClass(Acknowledge.class);
 
                        // check for number registration of registered resources
                        resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);
@@ -216,15 +205,10 @@ public class ResourceManagerTest {
                        assertEquals(1, reply.resources.size());
 
                        // Send task manager registration again
-                       resourceManager.tell(new RegisterResource(
-                                       ActorRef.noSender(),
-                                       new 
RegistrationMessages.RegisterTaskManager(resourceID,
-                                               
Mockito.mock(InstanceConnectionInfo.class),
-                                               null,
-                                               1)),
+                       resourceManager.tell(new 
NotifyResourceStarted(resourceID),
                                fakeJobManager);
 
-                       expectMsgClass(RegisterResourceSuccessful.class);
+                       expectMsgClass(Acknowledge.class);
 
                        // check for number registration of registered resources
                        resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);
@@ -232,17 +216,11 @@ public class ResourceManagerTest {
 
                        assertEquals(1, reply.resources.size());
 
-
                        // Send invalid null resource id to throw an exception 
during resource registration
-                       resourceManager.tell(new RegisterResource(
-                                       ActorRef.noSender(),
-                                       new 
RegistrationMessages.RegisterTaskManager(null,
-                                               
Mockito.mock(InstanceConnectionInfo.class),
-                                               null,
-                                               1)),
+                       resourceManager.tell(new NotifyResourceStarted(null),
                                fakeJobManager);
 
-                       expectMsgClass(RegisterResourceFailed.class);
+                       expectMsgClass(Acknowledge.class);
 
                        // check for number registration of registered resources
                        resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);
@@ -275,15 +253,10 @@ public class ResourceManagerTest {
                        resourceManager.tell(new RemoveResource(resourceID), 
fakeJobManager);
 
                        // Send task manager registration
-                       resourceManager.tell(new RegisterResource(
-                                       ActorRef.noSender(),
-                                       new 
RegistrationMessages.RegisterTaskManager(resourceID,
-                                               
Mockito.mock(InstanceConnectionInfo.class),
-                                               null,
-                                               1)),
+                       resourceManager.tell(new 
NotifyResourceStarted(resourceID),
                                fakeJobManager);
 
-                       expectMsgClass(RegisterResourceSuccessful.class);
+                       expectMsgClass(Acknowledge.class);
 
                        // check for number registration of registered resources
                        resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);
@@ -329,25 +302,16 @@ public class ResourceManagerTest {
                        ResourceID resourceID2 = ResourceID.generate();
 
                        // Send task manager registration
-                       resourceManager.tell(new RegisterResource(
-                                       ActorRef.noSender(),
-                                       new 
RegistrationMessages.RegisterTaskManager(resourceID1,
-                                               
Mockito.mock(InstanceConnectionInfo.class),
-                                               null,
-                                               1)),
+                       resourceManager.tell(new 
NotifyResourceStarted(resourceID1),
                                fakeJobManager);
 
+                       expectMsgClass(Acknowledge.class);
+
                        // Send task manager registration
-                       resourceManager.tell(new RegisterResource(
-                                       ActorRef.noSender(),
-                                       new 
RegistrationMessages.RegisterTaskManager(resourceID2,
-                                               
Mockito.mock(InstanceConnectionInfo.class),
-                                               null,
-                                               1)),
+                       resourceManager.tell(new 
NotifyResourceStarted(resourceID2),
                                fakeJobManager);
 
-                       expectMsgClass(RegisterResourceSuccessful.class);
-                       expectMsgClass(RegisterResourceSuccessful.class);
+                       expectMsgClass(Acknowledge.class);
 
                        // check for number registration of registered resources
                        resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 685fa9a..e23aba7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -50,6 +50,7 @@ import scala.Option;
 import scala.Some;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -286,14 +287,18 @@ public class TaskManagerRegistrationTest extends 
TestLogger {
                                jm = 
TestingUtils.createForwardingActor(actorSystem, getTestActor(), 
Option.<String>empty());
                                final ActorGateway jmGateway = jm;
 
+                               FiniteDuration refusedRegistrationPause = new 
FiniteDuration(500, TimeUnit.MILLISECONDS);
+                               Configuration tmConfig = new 
Configuration(config);
+                               
tmConfig.setString(ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, 
refusedRegistrationPause.toString());
+
                                // we make the test actor (the test kit) the 
JobManager to intercept
                                // the messages
                                taskManager = createTaskManager(
-                                               actorSystem,
-                                               jmGateway,
-                                               config,
-                                               true,
-                                               false);
+                                       actorSystem,
+                                       jmGateway,
+                                       tmConfig,
+                                       true,
+                                       false);
 
                                final ActorGateway taskManagerGateway = 
taskManager;
 
@@ -312,8 +317,10 @@ public class TaskManagerRegistrationTest extends 
TestLogger {
                                        }
                                };
 
+
+
                                // the TaskManager should wait a bit an retry...
-                               FiniteDuration maxDelay = (FiniteDuration) 
TaskManager.DELAY_AFTER_REFUSED_REGISTRATION().$times(2.0);
+                               FiniteDuration maxDelay = (FiniteDuration) 
refusedRegistrationPause.$times(3.0);
                                new Within(maxDelay) {
 
                                        @Override
@@ -333,6 +340,94 @@ public class TaskManagerRegistrationTest extends 
TestLogger {
        }
 
        /**
+        * Tests that the TaskManager does not send an excessive amount of 
registration messages to
+        * the job manager if its registration was rejected.
+        */
+       @Test
+       public void testTaskManagerNoExcessiveRegistrationMessages() throws 
Exception {
+               new JavaTestKit(actorSystem) {{
+                       ActorGateway jm = null;
+                       ActorGateway taskManager =null;
+                       try {
+                               FiniteDuration timeout = new FiniteDuration(5, 
TimeUnit.SECONDS);
+
+                               jm = 
TestingUtils.createForwardingActor(actorSystem, getTestActor(), 
Option.<String>empty());
+                               final ActorGateway jmGateway = jm;
+
+                               long refusedRegistrationPause = 500;
+                               long initialRegistrationPause = 100;
+                               long maxDelay = 30000;
+
+                               Configuration tmConfig = new 
Configuration(config);
+                               
tmConfig.setString(ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, 
refusedRegistrationPause + " ms");
+                               
tmConfig.setString(ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, 
initialRegistrationPause + " ms");
+
+                               // we make the test actor (the test kit) the 
JobManager to intercept
+                               // the messages
+                               taskManager = createTaskManager(
+                                       actorSystem,
+                                       jmGateway,
+                                       tmConfig,
+                                       true,
+                                       false);
+
+                               final ActorGateway taskManagerGateway = 
taskManager;
+
+                               final Deadline deadline = timeout.fromNow();
+
+                               try {
+                                       while (deadline.hasTimeLeft()) {
+                                               // the TaskManager should try 
to register
+                                               
expectMsgClass(deadline.timeLeft(), RegisterTaskManager.class);
+
+                                               // we decline the registration
+                                               taskManagerGateway.tell(
+                                                       new 
RefuseRegistration(new Exception("test reason")),
+                                                       jmGateway);
+                                       }
+                               } catch (AssertionError error) {
+                                       // ignore since it simply means that we 
have used up all our time
+                               }
+
+                               RegisterTaskManager[] 
registerTaskManagerMessages = new 
ReceiveWhile<RegisterTaskManager>(RegisterTaskManager.class, timeout) {
+                                       @Override
+                                       protected RegisterTaskManager 
match(Object msg) throws Exception {
+                                               if (msg instanceof 
RegisterTaskManager) {
+                                                       return 
(RegisterTaskManager) msg;
+                                               } else {
+                                                       throw noMatch();
+                                               }
+                                       }
+                               }.get();
+
+                               int maxExponent = (int) 
Math.floor(Math.log(((double) maxDelay / initialRegistrationPause + 
1))/Math.log(2));
+                               int exponent = (int) 
Math.ceil(Math.log(((double) timeout.toMillis() / initialRegistrationPause + 
1))/Math.log(2));
+
+                               int exp = Math.min(maxExponent, exponent);
+
+                               long difference = timeout.toMillis() - 
(initialRegistrationPause * (1 << exp));
+
+                               int numberRegisterTaskManagerMessages = exp;
+
+                               if (difference > 0) {
+                                       numberRegisterTaskManagerMessages += 
Math.ceil((double) difference / maxDelay);
+                               }
+
+                               int 
maxExpectedNumberOfRegisterTaskManagerMessages = 
numberRegisterTaskManagerMessages * 2;
+
+                               assertTrue("The number of RegisterTaskManager 
messages #"
+                                       + registerTaskManagerMessages.length
+                                       + " should be less than #"
+                                       + 
maxExpectedNumberOfRegisterTaskManagerMessages,
+                                       registerTaskManagerMessages.length <= 
maxExpectedNumberOfRegisterTaskManagerMessages);
+                       } finally {
+                               stopActor(taskManager);
+                               stopActor(jm);
+                       }
+               }};
+       }
+
+       /**
         * Validate that the TaskManager attempts to re-connect after it lost 
the connection
         * to the JobManager.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index dcb5334..94f9348 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -65,6 +65,14 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-yarn_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>${shading-artifact.name}</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index 3ed0dc1..a3337bb 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -43,8 +43,13 @@ public class TestingYarnClusterDescriptor extends 
AbstractYarnClusterDescriptor
                Preconditions.checkNotNull(testingRuntimeJar, "Could not find 
the flink-runtime tests " +
                        "jar. Make sure to package the flink-runtime module.");
 
+               File testingYarnJar = YarnTestBase.findFile("..", new 
TestJarFinder("flink-yarn"));
+               Preconditions.checkNotNull(testingRuntimeJar, "Could not find 
the flink-yarn tests " +
+                       "jar. Make sure to package the flink-yarn module.");
+
                filesToShip.add(testingJar);
                filesToShip.add(testingRuntimeJar);
+               filesToShip.add(testingYarnJar);
 
                addShipFiles(filesToShip);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
deleted file mode 100644
index 5a61b8f..0000000
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-/**
- * Flink's testing resource manager for Yarn.
- */
-public class TestingYarnFlinkResourceManager extends YarnFlinkResourceManager {
-
-       public TestingYarnFlinkResourceManager(
-               Configuration flinkConfig,
-               YarnConfiguration yarnConfig,
-               LeaderRetrievalService leaderRetrievalService,
-               String applicationMasterHostName,
-               String webInterfaceURL,
-               ContaineredTaskManagerParameters taskManagerParameters,
-               ContainerLaunchContext taskManagerLaunchContext,
-               int yarnHeartbeatIntervalMillis,
-               int maxFailedContainers,
-               int numInitialTaskManagers) {
-
-               super(
-                       flinkConfig,
-                       yarnConfig,
-                       leaderRetrievalService,
-                       applicationMasterHostName,
-                       webInterfaceURL,
-                       taskManagerParameters,
-                       taskManagerLaunchContext,
-                       yarnHeartbeatIntervalMillis,
-                       maxFailedContainers,
-                       numInitialTaskManagers);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index 12db578..b770c63 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -43,7 +43,7 @@ under the License.
                                </exclusion>
                        </exclusions>
                </dependency>
-               
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-clients_2.10</artifactId>
@@ -64,7 +64,6 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
-
                <dependency>
                        <groupId>com.typesafe.akka</groupId>
                        
<artifactId>akka-actor_${scala.binary.version}</artifactId>
@@ -85,8 +84,20 @@ under the License.
                        <artifactId>guava</artifactId>
                        <version>${guava.version}</version>
                </dependency>
-               
-               
+
+               <dependency>
+                       <groupId>com.typesafe.akka</groupId>
+                       
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
@@ -197,6 +208,18 @@ under the License.
                                        
<configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
                                </configuration>
                        </plugin>
+
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
                </plugins>
        </build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
index 974c4df..cb2f40a 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
@@ -30,7 +30,6 @@ import static java.util.Objects.requireNonNull;
  */
 public class RegisteredYarnWorkerNode implements ResourceIDRetrievable {
 
-
        /** The container on which the worker runs */
        private final Container yarnContainer;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 883a860..3c85795 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameter
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.messages.ContainersAllocated;
 import org.apache.flink.yarn.messages.ContainersComplete;
 
@@ -122,18 +123,75 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
        private RegisterApplicationMasterResponseReflector 
applicationMasterResponseReflector =
                new RegisterApplicationMasterResponseReflector(LOG);
 
+       public YarnFlinkResourceManager(
+               Configuration flinkConfig,
+               YarnConfiguration yarnConfig,
+               LeaderRetrievalService leaderRetrievalService,
+               String applicationMasterHostName,
+               String webInterfaceURL,
+               ContaineredTaskManagerParameters taskManagerParameters,
+               ContainerLaunchContext taskManagerLaunchContext,
+               int yarnHeartbeatIntervalMillis,
+               int maxFailedContainers,
+               int numInitialTaskManagers) {
+
+               this(
+                       flinkConfig,
+                       yarnConfig,
+                       leaderRetrievalService,
+                       applicationMasterHostName,
+                       webInterfaceURL,
+                       taskManagerParameters,
+                       taskManagerLaunchContext,
+                       yarnHeartbeatIntervalMillis,
+                       maxFailedContainers,
+                       numInitialTaskManagers,
+                       new YarnResourceManagerCallbackHandler());
+       }
 
        public YarnFlinkResourceManager(
-                       Configuration flinkConfig,
-                       YarnConfiguration yarnConfig,
-                       LeaderRetrievalService leaderRetrievalService,
-                       String applicationMasterHostName,
-                       String webInterfaceURL,
-                       ContaineredTaskManagerParameters taskManagerParameters,
-                       ContainerLaunchContext taskManagerLaunchContext,
-                       int yarnHeartbeatIntervalMillis,
-                       int maxFailedContainers,
-                       int numInitialTaskManagers) {
+               Configuration flinkConfig,
+               YarnConfiguration yarnConfig,
+               LeaderRetrievalService leaderRetrievalService,
+               String applicationMasterHostName,
+               String webInterfaceURL,
+               ContaineredTaskManagerParameters taskManagerParameters,
+               ContainerLaunchContext taskManagerLaunchContext,
+               int yarnHeartbeatIntervalMillis,
+               int maxFailedContainers,
+               int numInitialTaskManagers,
+               YarnResourceManagerCallbackHandler callbackHandler) {
+
+               this(
+                       flinkConfig,
+                       yarnConfig,
+                       leaderRetrievalService,
+                       applicationMasterHostName,
+                       webInterfaceURL,
+                       taskManagerParameters,
+                       taskManagerLaunchContext,
+                       yarnHeartbeatIntervalMillis,
+                       maxFailedContainers,
+                       numInitialTaskManagers,
+                       callbackHandler,
+                       
AMRMClientAsync.createAMRMClientAsync(yarnHeartbeatIntervalMillis, 
callbackHandler),
+                       NMClient.createNMClient());
+       }
+
+       public YarnFlinkResourceManager(
+               Configuration flinkConfig,
+               YarnConfiguration yarnConfig,
+               LeaderRetrievalService leaderRetrievalService,
+               String applicationMasterHostName,
+               String webInterfaceURL,
+               ContaineredTaskManagerParameters taskManagerParameters,
+               ContainerLaunchContext taskManagerLaunchContext,
+               int yarnHeartbeatIntervalMillis,
+               int maxFailedContainers,
+               int numInitialTaskManagers,
+               YarnResourceManagerCallbackHandler callbackHandler,
+               AMRMClientAsync<AMRMClient.ContainerRequest> 
resourceManagerClient,
+               NMClient nodeManagerClient) {
 
                super(numInitialTaskManagers, flinkConfig, 
leaderRetrievalService);
 
@@ -145,6 +203,10 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
                this.yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMillis;
                this.maxFailedContainers = maxFailedContainers;
 
+               this.resourceManagerCallbackHandler = 
Preconditions.checkNotNull(callbackHandler);
+               this.resourceManagerClient = 
Preconditions.checkNotNull(resourceManagerClient);
+               this.nodeManagerClient = 
Preconditions.checkNotNull(nodeManagerClient);
+
                this.containersInLaunch = new HashMap<>();
                this.containersBeingReturned = new HashMap<>();
        }
@@ -178,16 +240,12 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
        protected void initialize() throws Exception {
                LOG.info("Initializing YARN resource master");
 
-               // create the client to communicate with the ResourceManager
-               resourceManagerCallbackHandler = new 
YarnResourceManagerCallbackHandler(self());
+               resourceManagerCallbackHandler.initialize(self());
 
-               resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(
-                       yarnHeartbeatIntervalMillis, 
resourceManagerCallbackHandler);
                resourceManagerClient.init(yarnConfig);
                resourceManagerClient.start();
 
                // create the client to communicate with the node managers
-               nodeManagerClient = NMClient.createNMClient();
                nodeManagerClient.init(yarnConfig);
                nodeManagerClient.start();
                nodeManagerClient.cleanupRunningContainersOnStop(true);
@@ -277,7 +335,7 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
                        Priority priority = Priority.newInstance(0);
 
                        // Resource requirements for worker containers
-                       int taskManagerSlots = 
Integer.valueOf(System.getenv(YarnConfigKeys.ENV_SLOTS));
+                       int taskManagerSlots = taskManagerParameters.numSlots();
                        int vcores = 
config.getInteger(ConfigConstants.YARN_VCORES, Math.max(taskManagerSlots, 1));
                        Resource capability = 
Resource.newInstance(containerMemorySizeMB, vcores);
 
@@ -300,7 +358,7 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
        }
 
        @Override
-       protected void releaseRegisteredWorker(RegisteredYarnWorkerNode worker) 
{
+       protected void releaseStartedWorker(RegisteredYarnWorkerNode worker) {
                releaseYarnContainer(worker.yarnContainer());
        }
 
@@ -323,10 +381,13 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
        }
 
        @Override
-       protected RegisteredYarnWorkerNode workerRegistered(ResourceID 
resourceID) throws Exception {
+       protected RegisteredYarnWorkerNode workerStarted(ResourceID resourceID) 
{
                YarnContainerInLaunch inLaunch = 
containersInLaunch.remove(resourceID);
                if (inLaunch == null) {
-                       throw new Exception("Cannot register Worker - unknown 
resource id " + resourceID);
+                       // Container was not in state "being launched", this 
can indicate that the TaskManager
+                       // in this container was already registered or that the 
container was not started
+                       // by this resource manager. Simply ignore this 
resourceID.
+                       return null;
                } else {
                        return new 
RegisteredYarnWorkerNode(inLaunch.container());
                }
@@ -345,8 +406,12 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
                                accepted.add(new 
RegisteredYarnWorkerNode(yci.container()));
                        }
                        else {
-                               LOG.info("YARN container consolidation does not 
recognize TaskManager {}",
-                                       resourceID);
+                               if (isStarted(resourceID)) {
+                                       LOG.info("TaskManager {} has already 
been registered at the resource manager.", resourceID);
+                               } else {
+                                       LOG.info("YARN container consolidation 
does not recognize TaskManager {}",
+                                               resourceID);
+                               }
                        }
                }
                return accepted;
@@ -368,7 +433,7 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
 
        private void containersAllocated(List<Container> containers) {
                final int numRequired = getDesignatedWorkerPoolSize();
-               final int numRegistered = getNumberOfRegisteredTaskManagers();
+               final int numRegistered = getNumberOfStartedTaskManagers();
 
                for (Container container : containers) {
                        numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
@@ -519,7 +584,7 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
 
        private void updateProgress() {
                final int required = getDesignatedWorkerPoolSize();
-               final int available = getNumberOfRegisteredTaskManagers() + 
containersInLaunch.size();
+               final int available = getNumberOfStartedTaskManagers() + 
containersInLaunch.size();
                final float progress = (required <= 0) ? 1.0f : available / 
(float) required;
 
                if (resourceManagerCallbackHandler != null) {
@@ -583,7 +648,7 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
                 * @return A list with containers from previous application 
attempt.
                 */
                private List<Container> 
getContainersFromPreviousAttempts(RegisterApplicationMasterResponse response) {
-                       if (method != null) {
+                       if (method != null && response != null) {
                                try {
                                        @SuppressWarnings("unchecked")
                                        List<Container> list = 
(List<Container>) method.invoke(response);

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java
index 1e287c2..2372cbc 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java
@@ -42,12 +42,19 @@ public class YarnResourceManagerCallbackHandler implements 
AMRMClientAsync.Callb
 
        /** The progress we report */
        private float currentProgress;
-       
+
+       public YarnResourceManagerCallbackHandler() {
+               this(null);
+       }
        
        public YarnResourceManagerCallbackHandler(ActorRef yarnFrameworkMaster) 
{
                this.yarnFrameworkMaster = yarnFrameworkMaster;
        }
 
+       public void initialize(ActorRef yarnFrameworkMaster) {
+               this.yarnFrameworkMaster = yarnFrameworkMaster;
+       }
+
        /**
         * Sets the current progress.
         * @param progress The current progress fraction.
@@ -65,16 +72,20 @@ public class YarnResourceManagerCallbackHandler implements 
AMRMClientAsync.Callb
 
        @Override
        public void onContainersCompleted(List<ContainerStatus> list) {
-               yarnFrameworkMaster.tell(
-                       new ContainersComplete(list),
-                       ActorRef.noSender());
+               if (yarnFrameworkMaster != null) {
+                       yarnFrameworkMaster.tell(
+                               new ContainersComplete(list),
+                               ActorRef.noSender());
+               }
        }
 
        @Override
        public void onContainersAllocated(List<Container> containers) {
-               yarnFrameworkMaster.tell(
-                       new ContainersAllocated(containers),
-                       ActorRef.noSender());
+               if (yarnFrameworkMaster != null) {
+                       yarnFrameworkMaster.tell(
+                               new ContainersAllocated(containers),
+                               ActorRef.noSender());
+               }
        }
 
        @Override
@@ -89,8 +100,10 @@ public class YarnResourceManagerCallbackHandler implements 
AMRMClientAsync.Callb
 
        @Override
        public void onError(Throwable error) {
-               yarnFrameworkMaster.tell(
-                       new FatalErrorOccurred("Connection to YARN Resource 
Manager failed", error),
-                       ActorRef.noSender());
+               if (yarnFrameworkMaster != null) {
+                       yarnFrameworkMaster.tell(
+                               new FatalErrorOccurred("Connection to YARN 
Resource Manager failed", error),
+                               ActorRef.noSender());
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
 
b/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
new file mode 100644
index 0000000..f03c604
--- /dev/null
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import 
org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered;
+import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+public class TestingYarnFlinkResourceManager extends YarnFlinkResourceManager {
+
+       private final PriorityQueue<Tuple2<Integer, ActorRef>> waitingQueue = 
new PriorityQueue<>(32, new Comparator<Tuple2<Integer, ActorRef>>() {
+               @Override
+               public int compare(Tuple2<Integer, ActorRef> o1, 
Tuple2<Integer, ActorRef> o2) {
+                       return o1.f0 - o2.f0;
+               }
+       });
+
+       public TestingYarnFlinkResourceManager(
+               Configuration flinkConfig,
+               YarnConfiguration yarnConfig,
+               LeaderRetrievalService leaderRetrievalService,
+               String applicationMasterHostName,
+               String webInterfaceURL,
+               ContaineredTaskManagerParameters taskManagerParameters,
+               ContainerLaunchContext taskManagerLaunchContext,
+               int yarnHeartbeatIntervalMillis,
+               int maxFailedContainers,
+               int numInitialTaskManagers) {
+
+               super(flinkConfig,
+                       yarnConfig,
+                       leaderRetrievalService,
+                       applicationMasterHostName,
+                       webInterfaceURL,
+                       taskManagerParameters,
+                       taskManagerLaunchContext,
+                       yarnHeartbeatIntervalMillis,
+                       maxFailedContainers,
+                       numInitialTaskManagers);
+       }
+
+       public TestingYarnFlinkResourceManager(
+               Configuration flinkConfig,
+               YarnConfiguration yarnConfig,
+               LeaderRetrievalService leaderRetrievalService,
+               String applicationMasterHostName,
+               String webInterfaceURL,
+               ContaineredTaskManagerParameters taskManagerParameters,
+               ContainerLaunchContext taskManagerLaunchContext,
+               int yarnHeartbeatIntervalMillis,
+               int maxFailedContainers,
+               int numInitialTaskManagers,
+               YarnResourceManagerCallbackHandler callbackHandler,
+               AMRMClientAsync<AMRMClient.ContainerRequest> 
resourceManagerClient,
+               NMClient nodeManagerClient) {
+               super(flinkConfig, yarnConfig, leaderRetrievalService, 
applicationMasterHostName, webInterfaceURL, taskManagerParameters, 
taskManagerLaunchContext, yarnHeartbeatIntervalMillis, maxFailedContainers, 
numInitialTaskManagers, callbackHandler, resourceManagerClient, 
nodeManagerClient);
+       }
+
+       @Override
+       protected void handleMessage(Object message) {
+               if (message instanceof RequestNumberOfRegisteredResources) {
+                       getSender().tell(getNumberOfStartedTaskManagers(), 
getSelf());
+               } else if (message instanceof NotifyWhenResourcesRegistered) {
+                       NotifyWhenResourcesRegistered notifyMessage = 
(NotifyWhenResourcesRegistered) message;
+
+                       if (getNumberOfStartedTaskManagers() >= 
notifyMessage.getNumberResources()) {
+                               getSender().tell(true, getSelf());
+                       } else {
+                               
waitingQueue.offer(Tuple2.of(notifyMessage.getNumberResources(), getSender()));
+                       }
+               } else if (message instanceof NotifyResourceStarted) {
+                       super.handleMessage(message);
+
+                       while (!waitingQueue.isEmpty() && 
waitingQueue.peek().f0 <= getNumberOfStartedTaskManagers()) {
+                               ActorRef receiver = waitingQueue.poll().f1;
+                               receiver.tell(true, getSelf());
+                       }
+               } else {
+                       super.handleMessage(message);
+               }
+       }
+}

Reply via email to