[FLINK-4152] Allow re-registration of TMs at resource manager - Add YarnFlinkResourceManager test to reaccept task manager registrations from a re-elected job manager
- Remove unnecessary sync logic between JobManager and ResourceManager - Avoid duplicate reigstration attempts in case of a refused registration - Add test case to check that not an excessive amount of RegisterTaskManager messages are sent - Remove containersLaunched from YarnFlinkResourceManager and instead not clearing registeredWorkers when JobManager loses leadership - Let YarnFlinkResourceManagerTest extend TestLogger - Harden YarnFlinkResourceManager.getContainersFromPreviousAttempts - Add FatalErrorOccurred message handler to FlinkResourceManager; Increase timeout for YarnFlinkResourceManagerTest; Add additional constructor to TestingYarnFlinkResourceManager for tests - Rename registeredWorkers field into startedWorkers Additionally, the RegisterResource message is renamed into NotifyResourceStarted which tells the RM that a resource has been started. This reflects the current semantics of the startedWorkers map in the resource manager. - Fix concurrency issues in TestingLeaderRetrievalService This closes #2257 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2648bc1a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2648bc1a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2648bc1a Branch: refs/heads/master Commit: 2648bc1a5a5faed8c2061bcab40a8949fd02751c Parents: c6715b7 Author: Till Rohrmann <[email protected]> Authored: Fri Jul 15 10:51:59 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Tue Jul 26 16:39:22 2016 +0200 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 34 ++- .../clusterframework/FlinkResourceManager.java | 119 ++++---- .../messages/NotifyResourceStarted.java | 47 +++ .../messages/RegisterResource.java | 55 ---- .../messages/RegisterResourceFailed.java | 68 ----- .../messages/RegisterResourceSuccessful.java | 58 ---- .../standalone/StandaloneResourceManager.java | 4 +- .../testutils/TestingResourceManager.java | 2 +- .../flink/runtime/jobmanager/JobManager.scala | 51 +--- .../runtime/messages/RegistrationMessages.scala | 8 +- .../flink/runtime/taskmanager/TaskManager.scala | 202 +++++++++---- .../taskmanager/TaskManagerConfiguration.scala | 27 +- .../testingUtils/TestingJobManagerLike.scala | 5 +- .../TestingLeaderRetrievalService.java | 9 +- .../resourcemanager/ResourceManagerTest.java | 66 +--- .../TaskManagerRegistrationTest.java | 107 ++++++- flink-yarn-tests/pom.xml | 8 + .../yarn/TestingYarnClusterDescriptor.java | 5 + .../yarn/TestingYarnFlinkResourceManager.java | 56 ---- flink-yarn/pom.xml | 31 +- .../flink/yarn/RegisteredYarnWorkerNode.java | 1 - .../flink/yarn/YarnFlinkResourceManager.java | 113 +++++-- .../YarnResourceManagerCallbackHandler.java | 33 +- .../yarn/TestingYarnFlinkResourceManager.java | 111 +++++++ .../yarn/YarnFlinkResourceManagerTest.java | 298 +++++++++++++++++++ .../messages/NotifyWhenResourcesRegistered.java | 32 ++ .../RequestNumberOfRegisteredResources.java | 25 ++ .../src/test/resources/log4j-test.properties | 27 ++ flink-yarn/src/test/resources/logback-test.xml | 34 +++ 29 files changed, 1127 insertions(+), 509 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index e40bed3..028732a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -243,11 +243,28 @@ public final class ConfigConstants { public static final String TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS = "taskmanager.debug.memory.logIntervalMs"; /** - * + * Defines the maximum time it can take for the TaskManager registration. If the duration is + * exceeded without a successful registration, then the TaskManager terminates. */ public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = "taskmanager.maxRegistrationDuration"; /** + * The initial registration pause between two consecutive registration attempts. The pause + * is doubled for each new registration attempt until it reaches the maximum registration pause. + */ + public static final String TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = "taskmanager.initial-registration-pause"; + + /** + * The maximum registration pause between two consecutive registration attempts. + */ + public static final String TASK_MANAGER_MAX_REGISTARTION_PAUSE = "taskmanager.max-registration-pause"; + + /** + * The pause after a registration has been refused by the job manager before retrying to connect. + */ + public static final String TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "taskmanager.refused-registration-pause"; + + /** * Time interval between two successive task cancellation attempts in milliseconds. */ @PublicEvolving @@ -788,6 +805,21 @@ public final class ConfigConstants { public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf"; /** + * The default task manager's initial registration pause. + */ + public static final String DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = "500 ms"; + + /** + * The default task manager's maximum registration pause. + */ + public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE = "30 s"; + + /** + * The default task manager's refused registration pause. + */ + public static final String DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "10 s"; + + /** * The default setting for TaskManager memory eager allocation of managed memory */ public static final boolean DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE = false; http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java index d28d4aa..95be084 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java @@ -34,10 +34,8 @@ import org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContai import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; import org.apache.flink.runtime.clusterframework.messages.InfoMessage; import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful; -import org.apache.flink.runtime.clusterframework.messages.RegisterResource; -import org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed; +import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted; import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; -import org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful; import org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable; import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener; import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; @@ -51,10 +49,9 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; -import org.apache.flink.runtime.messages.RegistrationMessages; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import scala.concurrent.Future; @@ -89,7 +86,7 @@ import static java.util.Objects.requireNonNull; * <li>At some point, the TaskManager processes will have started and send a registration * message to the JobManager. The JobManager will perform * a lookup with the ResourceManager to check if it really started this TaskManager. - * The method {@link #workerRegistered(ResourceID)} will be called + * The method {@link #workerStarted(ResourceID)} will be called * to inform about a registered worker.</li> * </ol> * @@ -113,8 +110,9 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva /** The service to find the right leader JobManager (to support high availability) */ private final LeaderRetrievalService leaderRetriever; - /** The currently registered resources */ - private final Map<ResourceID, WorkerType> registeredWorkers; + /** Map which contains the workers from which we know that they have been successfully started + * in a container. This notification is sent by the JM when a TM tries to register at it. */ + private final Map<ResourceID, WorkerType> startedWorkers; /** List of listeners for info messages */ private final Set<ActorRef> infoMessageListeners; @@ -141,7 +139,7 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva LeaderRetrievalService leaderRetriever) { this.config = requireNonNull(flinkConfig); this.leaderRetriever = requireNonNull(leaderRetriever); - this.registeredWorkers = new HashMap<>(); + this.startedWorkers = new HashMap<>(); FiniteDuration lt; try { @@ -230,9 +228,9 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva // --- lookup of registered resources - else if (message instanceof RegisterResource) { - RegisterResource msg = (RegisterResource) message; - handleRegisterResource(sender(), msg.getTaskManager(), msg.getRegisterMessage()); + else if (message instanceof NotifyResourceStarted) { + NotifyResourceStarted msg = (NotifyResourceStarted) message; + handleResourceStarted(sender(), msg.getResourceID()); } // --- messages about JobManager leader status and registration @@ -273,6 +271,11 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva infoMessageListeners.remove(sender()); } + else if (message instanceof FatalErrorOccurred) { + FatalErrorOccurred fatalErrorOccurred = (FatalErrorOccurred) message; + fatalError(fatalErrorOccurred.message(), fatalErrorOccurred.error()); + } + // --- unknown messages else { @@ -307,73 +310,68 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva } /** - * Gets the number of currently registered TaskManagers. + * Gets the number of currently started TaskManagers. * - * @return The number of currently registered TaskManagers. + * @return The number of currently started TaskManagers. */ - public int getNumberOfRegisteredTaskManagers() { - return registeredWorkers.size(); + public int getNumberOfStartedTaskManagers() { + return startedWorkers.size(); } /** * Gets the currently registered resources. * @return */ - public Collection<WorkerType> getRegisteredTaskManagers() { - return registeredWorkers.values(); + public Collection<WorkerType> getStartedTaskManagers() { + return startedWorkers.values(); } /** - * Gets the registered worker for a given resource ID, if one is available. + * Gets the started worker for a given resource ID, if one is available. * * @param resourceId The resource ID for the worker. * @return True if already registered, otherwise false */ - public boolean isRegistered(ResourceID resourceId) { - return registeredWorkers.containsKey(resourceId); + public boolean isStarted(ResourceID resourceId) { + return startedWorkers.containsKey(resourceId); } /** - * Gets an iterable for all currently registered TaskManagers. + * Gets an iterable for all currently started TaskManagers. * - * @return All currently registered TaskManagers. + * @return All currently started TaskManagers. */ - public Collection<WorkerType> allRegisteredWorkers() { - return registeredWorkers.values(); + public Collection<WorkerType> allStartedWorkers() { + return startedWorkers.values(); } /** - * Register a resource on which a TaskManager has been started + * Tells the ResourceManager that a TaskManager had been started in a container with the given + * resource id. + * * @param jobManager The sender (JobManager) of the message - * @param taskManager The task manager who wants to register - * @param msg The task manager's registration message + * @param resourceID The resource id of the started TaskManager */ - private void handleRegisterResource(ActorRef jobManager, ActorRef taskManager, - RegistrationMessages.RegisterTaskManager msg) { - - ResourceID resourceID = msg.resourceId(); - try { - Preconditions.checkNotNull(resourceID); + private void handleResourceStarted(ActorRef jobManager, ResourceID resourceID) { + if (resourceID != null) { // check if resourceID is already registered (TaskManager may send duplicate register messages) - WorkerType oldWorker = registeredWorkers.get(resourceID); + WorkerType oldWorker = startedWorkers.get(resourceID); if (oldWorker != null) { - LOG.debug("TaskManager {} had been registered before.", resourceID); + LOG.debug("Notification that TaskManager {} had been started was sent before.", resourceID); } else { - WorkerType newWorker = workerRegistered(resourceID); - registeredWorkers.put(resourceID, newWorker); - LOG.info("TaskManager {} has registered.", resourceID); - } - jobManager.tell(decorateMessage( - new RegisterResourceSuccessful(taskManager, msg)), - self()); - } catch (Exception e) { - LOG.warn("TaskManager resource registration failed for {}", resourceID, e); + WorkerType newWorker = workerStarted(resourceID); - // tell the JobManager about the failure - String eStr = ExceptionUtils.stringifyException(e); - sender().tell(decorateMessage( - new RegisterResourceFailed(taskManager, resourceID, eStr)), self()); + if (newWorker != null) { + startedWorkers.put(resourceID, newWorker); + LOG.info("TaskManager {} has started.", resourceID); + } else { + LOG.info("TaskManager {} has not been started by this resource manager.", resourceID); + } + } } + + // Acknowledge the resource registration + jobManager.tell(decorateMessage(Acknowledge.get()), self()); } /** @@ -384,9 +382,9 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva */ private void removeRegisteredResource(ResourceID resourceId) { - WorkerType worker = registeredWorkers.remove(resourceId); + WorkerType worker = startedWorkers.remove(resourceId); if (worker != null) { - releaseRegisteredWorker(worker); + releaseStartedWorker(worker); } else { LOG.warn("Resource {} could not be released", resourceId); } @@ -463,8 +461,7 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva } /** - * This method disassociates from the current leader JobManager. All currently registered - * TaskManagers are put under "awaiting registration". + * This method disassociates from the current leader JobManager. */ private void jobManagerLostLeadership() { if (jobManager != null) { @@ -474,8 +471,6 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva leaderSessionID = null; infoMessageListeners.clear(); - - registeredWorkers.clear(); } } @@ -510,7 +505,7 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva // put the consolidated TaskManagers into our bookkeeping for (WorkerType worker : consolidated) { ResourceID resourceID = worker.getResourceID(); - registeredWorkers.put(resourceID, worker); + startedWorkers.put(resourceID, worker); toHandle.remove(resourceID); } } @@ -568,7 +563,7 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva "Number of pending workers pending registration should never be below 0."); // see how many workers we want, and whether we have enough - int allAvailableAndPending = registeredWorkers.size() + + int allAvailableAndPending = startedWorkers.size() + numWorkersPending + numWorkersPendingRegistration; int missing = designatedPoolSize - allAvailableAndPending; @@ -619,7 +614,7 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva * @param message An informational message that explains why the worker failed. */ public void notifyWorkerFailed(ResourceID resourceID, String message) { - WorkerType worker = registeredWorkers.remove(resourceID); + WorkerType worker = startedWorkers.remove(resourceID); if (worker != null) { jobManager.tell( decorateMessage( @@ -676,16 +671,16 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva protected abstract void releasePendingWorker(ResourceID resourceID); /** - * Trigger a release of a registered worker. + * Trigger a release of a started worker. * @param resourceID The worker resource id */ - protected abstract void releaseRegisteredWorker(WorkerType resourceID); + protected abstract void releaseStartedWorker(WorkerType resourceID); /** - * Callback when a worker was registered. + * Callback when a worker was started. * @param resourceID The worker resource id */ - protected abstract WorkerType workerRegistered(ResourceID resourceID) throws Exception; + protected abstract WorkerType workerStarted(ResourceID resourceID); /** * This method is called when the resource manager starts after a failure and reconnects to http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NotifyResourceStarted.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NotifyResourceStarted.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NotifyResourceStarted.java new file mode 100644 index 0000000..1427ba8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NotifyResourceStarted.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.messages; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.messages.RequiresLeaderSessionID; + +/** + * Notifies the ResourceManager that a TaskManager has been started in a container with the given + * resource id. + */ +public class NotifyResourceStarted implements RequiresLeaderSessionID, java.io.Serializable { + private static final long serialVersionUID = 1L; + + private final ResourceID resourceID; + + public NotifyResourceStarted(ResourceID resourceID) { + this.resourceID = resourceID; + } + + public ResourceID getResourceID() { + return resourceID; + } + + @Override + public String toString() { + return "NotifyResourceStarted{" + + ", resourceID=" + resourceID + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResource.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResource.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResource.java deleted file mode 100644 index bad51f0..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResource.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.clusterframework.messages; - -import akka.actor.ActorRef; -import org.apache.flink.runtime.messages.RegistrationMessages; -import org.apache.flink.runtime.messages.RequiresLeaderSessionID; - -/** - * Triggers a lookup at the ResourceManager to check if the resource for a TaskManager is registered. - */ -public class RegisterResource implements RequiresLeaderSessionID, java.io.Serializable { - private static final long serialVersionUID = 1L; - - private final ActorRef taskManager; - private final RegistrationMessages.RegisterTaskManager registerMessage; - - - public RegisterResource(ActorRef taskManager, RegistrationMessages.RegisterTaskManager registerMessage) { - this.taskManager = taskManager; - this.registerMessage = registerMessage; - } - - public ActorRef getTaskManager() { - return taskManager; - } - - public RegistrationMessages.RegisterTaskManager getRegisterMessage() { - return registerMessage; - } - - @Override - public String toString() { - return "RegisterResource{" + - "taskManager=" + taskManager + - ", registerMessage=" + registerMessage + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceFailed.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceFailed.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceFailed.java deleted file mode 100644 index a19c0ab..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceFailed.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.clusterframework.messages; - -import akka.actor.ActorRef; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.messages.RequiresLeaderSessionID; - -/** - * Answer to RegisterResource to indicate that the requested resource is unknown. - * Sent by the ResourceManager to the JobManager. - */ -public class RegisterResourceFailed implements RequiresLeaderSessionID, java.io.Serializable { - private static final long serialVersionUID = 1L; - - /** Task Manager which tried to register */ - private final ActorRef taskManager; - - /** The id of the task manager resource */ - private final ResourceID resourceID; - - /** Error message */ - private final String message; - - public RegisterResourceFailed(ActorRef taskManager, ResourceID resourceId, String message) { - this.taskManager = taskManager; - this.resourceID = resourceId; - this.message = message; - } - - - public String getMessage() { - return message; - } - - public ActorRef getTaskManager() { - return taskManager; - } - - public ResourceID getResourceID() { - return resourceID; - } - - @Override - public String toString() { - return "RegisterResourceFailed{" + - "taskManager=" + taskManager + - ", resourceID=" + resourceID + - ", message='" + message + '\'' + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceSuccessful.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceSuccessful.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceSuccessful.java deleted file mode 100644 index c29d28d..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceSuccessful.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.clusterframework.messages; - -import akka.actor.ActorRef; -import org.apache.flink.runtime.messages.RegistrationMessages; -import org.apache.flink.runtime.messages.RequiresLeaderSessionID; - - -/** - * Answer to RegisterResource to indicate that the requested resource is known. - * Sent by the ResourceManager to the JobManager. - */ -public class RegisterResourceSuccessful implements RequiresLeaderSessionID, java.io.Serializable { - private static final long serialVersionUID = 1L; - - private final ActorRef taskManager; - private final RegistrationMessages.RegisterTaskManager registrationMessage; - - public RegisterResourceSuccessful(ActorRef taskManager, - RegistrationMessages.RegisterTaskManager registrationMessage) { - this.taskManager = taskManager; - this.registrationMessage = registrationMessage; - } - - - public ActorRef getTaskManager() { - return taskManager; - } - - public RegistrationMessages.RegisterTaskManager getRegistrationMessage() { - return registrationMessage; - } - - @Override - public String toString() { - return "RegisterResourceSuccessful{" + - "taskManager=" + taskManager + - ", registrationMessage=" + registrationMessage + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java index 4626461..89a602e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java @@ -83,13 +83,13 @@ public class StandaloneResourceManager extends FlinkResourceManager<ResourceID> } @Override - protected ResourceID workerRegistered(ResourceID resourceID) { + protected ResourceID workerStarted(ResourceID resourceID) { // we accept everything return resourceID; } @Override - protected void releaseRegisteredWorker(ResourceID resourceID) { + protected void releaseStartedWorker(ResourceID resourceID) { // cannot release any workers, they simply stay } http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java index 2422925..495cacd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/testutils/TestingResourceManager.java @@ -58,7 +58,7 @@ public class TestingResourceManager extends StandaloneResourceManager { protected void handleMessage(Object message) { if (message instanceof GetRegisteredResources) { - sender().tell(new GetRegisteredResourcesReply(getRegisteredTaskManagers()), self()); + sender().tell(new GetRegisteredResourcesReply(getStartedTaskManagers()), self()); } else if (message instanceof FailResource) { ResourceID resourceID = ((FailResource) message).resourceID; notifyWorkerFailed(resourceID, "Failed for test case."); http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 0026bef..f14a37f 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -400,36 +400,23 @@ class JobManager( currentResourceManager match { case Some(rm) => - val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout) - future.onComplete { - case scala.util.Success(response) => - // the resource manager is available and answered - self ! response - case scala.util.Failure(t) => + val future = (rm ? decorateMessage(new NotifyResourceStarted(msg.resourceId)))(timeout) + future.onFailure { + case t: Throwable => t match { case _: TimeoutException => log.info("Attempt to register resource at ResourceManager timed out. Retrying") case _ => log.warn("Failure while asking ResourceManager for RegisterResource. Retrying", t) } - // slow or unreachable resource manager, register anyway and let the rm reconnect - self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) self ! decorateMessage(new ReconnectResourceManager(rm)) }(context.dispatcher) case None => log.info("Task Manager Registration but not connected to ResourceManager") - // ResourceManager not yet available - // sending task manager information later upon ResourceManager registration - self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg)) } - case msg: RegisterResourceSuccessful => - - val originalMsg = msg.getRegistrationMessage - val taskManager = msg.getTaskManager - - // ResourceManager knows about the resource, now let's try to register TaskManager + // ResourceManager is told about the resource, now let's try to register TaskManager if (instanceManager.isRegistered(taskManager)) { val instanceID = instanceManager.getRegisteredInstance(taskManager).getId @@ -441,10 +428,10 @@ class JobManager( try { val instanceID = instanceManager.registerTaskManager( taskManager, - originalMsg.resourceId, - originalMsg.connectionInfo, - originalMsg.resources, - originalMsg.numberOfSlots, + resourceId, + connectionInfo, + hardwareInformation, + numberOfSlots, leaderSessionID.orNull) taskManager ! decorateMessage( @@ -463,24 +450,18 @@ class JobManager( } } - case msg: RegisterResourceFailed => - - val taskManager = msg.getTaskManager - val resourceId = msg.getResourceID - log.warn(s"TaskManager's resource id $resourceId failed to register at ResourceManager. " + - s"Refusing registration because of\n${msg.getMessage}.") - - taskManager ! decorateMessage( - RefuseRegistration(new IllegalStateException( - s"Resource $resourceId not registered with resource manager."))) - case msg: ResourceRemoved => // we're being informed by the resource manager that a resource has become unavailable val resourceID = msg.resourceId() log.debug(s"Resource has been removed: $resourceID") - val instance = instanceManager.getRegisteredInstance(resourceID) - // trigger removal of task manager - handleTaskManagerTerminated(instance.getActorGateway.actor()) + + Option(instanceManager.getRegisteredInstance(resourceID)) match { + case Some(instance) => + // trigger removal of task manager + handleTaskManagerTerminated(instance.getActorGateway.actor()) + case None => + log.debug(s"Resource $resourceID has not been registered at job manager.") + } case RequestNumberRegisteredTaskManager => sender ! decorateMessage(instanceManager.getNumberOfRegisteredTaskManagers) http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala index b48bcf9..d362164 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala @@ -18,9 +18,11 @@ package org.apache.flink.runtime.messages +import java.util.UUID + import akka.actor.ActorRef import org.apache.flink.runtime.clusterframework.types.ResourceID -import org.apache.flink.runtime.instance.{InstanceConnectionInfo, InstanceID, HardwareDescription} +import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID} import scala.concurrent.duration.{Deadline, FiniteDuration} @@ -42,12 +44,14 @@ object RegistrationMessages { * @param timeout The timeout for the message. The next retry will double this timeout. * @param deadline Optional deadline until when the registration must be completed. * @param attempt The attempt number, for logging. + * @param registrationRun UUID of the current registration run to filter out outdated runs */ case class TriggerTaskManagerRegistration( jobManagerURL: String, timeout: FiniteDuration, deadline: Option[Deadline], - attempt: Int) + attempt: Int, + registrationRun: UUID) extends RegistrationMessage /** http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index dcf1e38..a7dd789 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -189,6 +189,10 @@ class TaskManager( connectionInfo.getHostname(), new UnmodifiableConfiguration(config.configuration), config.tmpDirPaths) + + private var scheduledTaskManagerRegistration: Option[Cancellable] = None + private var currentRegistrationRun: UUID = UUID.randomUUID() + // -------------------------------------------------------------------------- // Actor messages and life cycle // -------------------------------------------------------------------------- @@ -563,54 +567,60 @@ class TaskManager( jobManagerURL, timeout, deadline, - attempt) => + attempt, + registrationRun) => + + if (registrationRun.equals(this.currentRegistrationRun)) { + if (isConnected) { + // this may be the case, if we queue another attempt and + // in the meantime, the registration is acknowledged + log.debug( + "TaskManager was triggered to register at JobManager, but is already registered") + } else if (deadline.exists(_.isOverdue())) { + // we failed to register in time. that means we should quit + log.error("Failed to register at the JobManager withing the defined maximum " + + "connect time. Shutting down ...") + + // terminate ourselves (hasta la vista) + self ! decorateMessage(PoisonPill) + } else { + if (!jobManagerAkkaURL.equals(Option(jobManagerURL))) { + throw new Exception("Invalid internal state: Trying to register at JobManager " + + s"$jobManagerURL even though the current JobManagerAkkaURL " + + s"is set to ${jobManagerAkkaURL.getOrElse("")}") + } - if (isConnected) { - // this may be the case, if we queue another attempt and - // in the meantime, the registration is acknowledged - log.debug( - "TaskManager was triggered to register at JobManager, but is already registered") - } else if (deadline.exists(_.isOverdue())) { - // we failed to register in time. that means we should quit - log.error("Failed to register at the JobManager withing the defined maximum " + - "connect time. Shutting down ...") - - // terminate ourselves (hasta la vista) - self ! decorateMessage(PoisonPill) - } else { - if (!jobManagerAkkaURL.equals(Option(jobManagerURL))) { - throw new Exception("Invalid internal state: Trying to register at JobManager " + - s"$jobManagerURL even though the current JobManagerAkkaURL is set to " + - s"${jobManagerAkkaURL.getOrElse("")}") - } + log.info(s"Trying to register at JobManager $jobManagerURL " + + s"(attempt $attempt, timeout: $timeout)") + + val jobManager = context.actorSelection(jobManagerURL) - log.info(s"Trying to register at JobManager $jobManagerURL " + - s"(attempt $attempt, timeout: $timeout)") - - val jobManager = context.actorSelection(jobManagerURL) - - jobManager ! decorateMessage( - RegisterTaskManager( - resourceID, - connectionInfo, - resources, - numberOfSlots) - ) - - // the next timeout computes via exponential backoff with cap - val nextTimeout = (timeout * 2).min(TaskManager.MAX_REGISTRATION_TIMEOUT) - - // schedule (with our timeout s delay) a check triggers a new registration - // attempt, if we are not registered by then - context.system.scheduler.scheduleOnce( - timeout, - self, - decorateMessage(TriggerTaskManagerRegistration( - jobManagerURL, - nextTimeout, - deadline, - attempt + 1) - ))(context.dispatcher) + jobManager ! decorateMessage( + RegisterTaskManager( + resourceID, + connectionInfo, + resources, + numberOfSlots) + ) + + // the next timeout computes via exponential backoff with cap + val nextTimeout = (timeout * 2).min(config.maxRegistrationPause) + + // schedule (with our timeout s delay) a check triggers a new registration + // attempt, if we are not registered by then + scheduledTaskManagerRegistration = Option(context.system.scheduler.scheduleOnce( + timeout, + self, + decorateMessage(TriggerTaskManagerRegistration( + jobManagerURL, + nextTimeout, + deadline, + attempt + 1, + registrationRun) + ))(context.dispatcher)) + } + } else { + log.info(s"Discarding registration run with ID $registrationRun") } // successful registration. associate with the JobManager @@ -668,20 +678,27 @@ class TaskManager( if(jobManagerAkkaURL.isDefined) { // try the registration again after some time - val delay: FiniteDuration = TaskManager.DELAY_AFTER_REFUSED_REGISTRATION + val delay: FiniteDuration = config.refusedRegistrationPause val deadline: Option[Deadline] = config.maxRegistrationDuration.map { timeout => timeout + delay fromNow } - context.system.scheduler.scheduleOnce(delay) { - self ! decorateMessage( - TriggerTaskManagerRegistration( - jobManagerAkkaURL.get, - TaskManager.INITIAL_REGISTRATION_TIMEOUT, - deadline, - 1) - ) - }(context.dispatcher) + // start a new registration run + currentRegistrationRun = UUID.randomUUID() + + scheduledTaskManagerRegistration.foreach(_.cancel()) + + scheduledTaskManagerRegistration = Option( + context.system.scheduler.scheduleOnce(delay) { + self ! decorateMessage( + TriggerTaskManagerRegistration( + jobManagerAkkaURL.get, + config.initialRegistrationPause, + deadline, + 1, + currentRegistrationRun) + ) + }(context.dispatcher)) } } else { // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration @@ -1359,12 +1376,18 @@ class TaskManager( // begin attempts to reconnect val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow) + // start a new registration run + currentRegistrationRun = UUID.randomUUID() + + scheduledTaskManagerRegistration.foreach(_.cancel()) + self ! decorateMessage( TriggerTaskManagerRegistration( jobManagerAkkaURL.get, - TaskManager.INITIAL_REGISTRATION_TIMEOUT, + config.initialRegistrationPause, deadline, - 1) + 1, + currentRegistrationRun) ) } } @@ -1412,13 +1435,6 @@ object TaskManager { * connection attempts */ val STARTUP_CONNECT_LOG_SUPPRESS = 10000L - /** The initial time for registration of the TaskManager with the JobManager */ - val INITIAL_REGISTRATION_TIMEOUT: FiniteDuration = 500 milliseconds - /** The maximum time for registration of the TaskManager with the JobManager */ - val MAX_REGISTRATION_TIMEOUT: FiniteDuration = 30 seconds - - val DELAY_AFTER_REFUSED_REGISTRATION: FiniteDuration = 10 seconds - val HEARTBEAT_INTERVAL: FiniteDuration = 5000 milliseconds @@ -2116,13 +2132,67 @@ object TaskManager { e) } + val initialRegistrationPause = try { + val pause = Duration(configuration.getString( + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, + ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE + )) + + if (pause.isFinite()) { + pause.asInstanceOf[FiniteDuration] + } else { + throw new IllegalArgumentException(s"The initial registration pause must be finite: $pause") + } + } catch { + case e: NumberFormatException => throw new IllegalArgumentException( + "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, + e) + } + + val maxRegistrationPause = try { + val pause = Duration(configuration.getString( + ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE, + ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE + )) + + if (pause.isFinite()) { + pause.asInstanceOf[FiniteDuration] + } else { + throw new IllegalArgumentException(s"The maximum registration pause must be finite: $pause") + } + } catch { + case e: NumberFormatException => throw new IllegalArgumentException( + "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, + e) + } + + val refusedRegistrationPause = try { + val pause = Duration(configuration.getString( + ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, + ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE + )) + + if (pause.isFinite()) { + pause.asInstanceOf[FiniteDuration] + } else { + throw new IllegalArgumentException(s"The refused registration pause must be finite: $pause") + } + } catch { + case e: NumberFormatException => throw new IllegalArgumentException( + "Invalid format for parameter " + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, + e) + } + val taskManagerConfig = TaskManagerConfiguration( tmpDirs, cleanupInterval, timeout, finiteRegistrationDuration, slots, - configuration) + configuration, + initialRegistrationPause, + maxRegistrationPause, + refusedRegistrationPause) (taskManagerConfig, networkConfig, connectionInfo, memType) } http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala index 03e8e63..aab3c5f 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala @@ -18,6 +18,8 @@ package org.apache.flink.runtime.taskmanager +import java.util.concurrent.TimeUnit + import org.apache.flink.configuration.Configuration import scala.concurrent.duration.FiniteDuration @@ -28,4 +30,27 @@ case class TaskManagerConfiguration( timeout: FiniteDuration, maxRegistrationDuration: Option[FiniteDuration], numberOfSlots: Int, - configuration: Configuration) + configuration: Configuration, + initialRegistrationPause: FiniteDuration, + maxRegistrationPause: FiniteDuration, + refusedRegistrationPause: FiniteDuration) { + + def this( + tmpDirPaths: Array[String], + cleanupInterval: Long, + timeout: FiniteDuration, + maxRegistrationDuration: Option[FiniteDuration], + numberOfSlots: Int, + configuration: Configuration) { + this ( + tmpDirPaths, + cleanupInterval, + timeout, + maxRegistrationDuration, + numberOfSlots, + configuration, + FiniteDuration(500, TimeUnit.MILLISECONDS), + FiniteDuration(30, TimeUnit.SECONDS), + FiniteDuration(10, TimeUnit.SECONDS)) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala index 2fe830f..9640fcd 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala @@ -22,7 +22,6 @@ import akka.actor.{ActorRef, Cancellable, Terminated} import akka.pattern.{ask, pipe} import org.apache.flink.api.common.JobID import org.apache.flink.runtime.FlinkActor -import org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.jobmanager.JobManager @@ -339,10 +338,10 @@ trait TestingJobManagerLike extends FlinkActor { } // TaskManager may be registered on these two messages - case msg @ (_: RegisterTaskManager | _: RegisterResourceSuccessful) => + case msg @ (_: RegisterTaskManager) => super.handleMessage(msg) - // dequeue all senders which wait for instanceManager.getNumberOfRegisteredTaskManagers or + // dequeue all senders which wait for instanceManager.getNumberOfStartedTaskManagers or // fewer registered TaskManagers while (waitForNumRegisteredTaskManagers.nonEmpty && waitForNumRegisteredTaskManagers.head._1 <= http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java index c5eb155..d6bcaaf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java @@ -30,10 +30,10 @@ import java.util.UUID; */ public class TestingLeaderRetrievalService implements LeaderRetrievalService { - private final String leaderAddress; - private final UUID leaderSessionID; + private volatile String leaderAddress; + private volatile UUID leaderSessionID; - private LeaderRetrievalListener listener; + private volatile LeaderRetrievalListener listener; public TestingLeaderRetrievalService() { this(null, null); @@ -59,6 +59,9 @@ public class TestingLeaderRetrievalService implements LeaderRetrievalService { } public void notifyListener(String address, UUID leaderSessionID) { + this.leaderAddress = address; + this.leaderSessionID = leaderSessionID; + if (listener != null) { listener.notifyLeaderAddress(address, leaderSessionID); } http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java index d75341f..043c81c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java @@ -18,32 +18,26 @@ package org.apache.flink.runtime.resourcemanager; -import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.clusterframework.messages.RegisterResource; -import org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed; +import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted; import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; -import org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful; import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; import org.apache.flink.runtime.clusterframework.messages.RemoveResource; import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved; import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.messages.RegistrationMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.TestingResourceManager; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.Mockito; import scala.Option; import java.util.ArrayList; @@ -198,15 +192,10 @@ public class ResourceManagerTest { ResourceID resourceID = ResourceID.generate(); // Send task manager registration - resourceManager.tell(new RegisterResource( - ActorRef.noSender(), - new RegistrationMessages.RegisterTaskManager(resourceID, - Mockito.mock(InstanceConnectionInfo.class), - null, - 1)), + resourceManager.tell(new NotifyResourceStarted(resourceID), fakeJobManager); - expectMsgClass(RegisterResourceSuccessful.class); + expectMsgClass(Acknowledge.class); // check for number registration of registered resources resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); @@ -216,15 +205,10 @@ public class ResourceManagerTest { assertEquals(1, reply.resources.size()); // Send task manager registration again - resourceManager.tell(new RegisterResource( - ActorRef.noSender(), - new RegistrationMessages.RegisterTaskManager(resourceID, - Mockito.mock(InstanceConnectionInfo.class), - null, - 1)), + resourceManager.tell(new NotifyResourceStarted(resourceID), fakeJobManager); - expectMsgClass(RegisterResourceSuccessful.class); + expectMsgClass(Acknowledge.class); // check for number registration of registered resources resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); @@ -232,17 +216,11 @@ public class ResourceManagerTest { assertEquals(1, reply.resources.size()); - // Send invalid null resource id to throw an exception during resource registration - resourceManager.tell(new RegisterResource( - ActorRef.noSender(), - new RegistrationMessages.RegisterTaskManager(null, - Mockito.mock(InstanceConnectionInfo.class), - null, - 1)), + resourceManager.tell(new NotifyResourceStarted(null), fakeJobManager); - expectMsgClass(RegisterResourceFailed.class); + expectMsgClass(Acknowledge.class); // check for number registration of registered resources resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); @@ -275,15 +253,10 @@ public class ResourceManagerTest { resourceManager.tell(new RemoveResource(resourceID), fakeJobManager); // Send task manager registration - resourceManager.tell(new RegisterResource( - ActorRef.noSender(), - new RegistrationMessages.RegisterTaskManager(resourceID, - Mockito.mock(InstanceConnectionInfo.class), - null, - 1)), + resourceManager.tell(new NotifyResourceStarted(resourceID), fakeJobManager); - expectMsgClass(RegisterResourceSuccessful.class); + expectMsgClass(Acknowledge.class); // check for number registration of registered resources resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); @@ -329,25 +302,16 @@ public class ResourceManagerTest { ResourceID resourceID2 = ResourceID.generate(); // Send task manager registration - resourceManager.tell(new RegisterResource( - ActorRef.noSender(), - new RegistrationMessages.RegisterTaskManager(resourceID1, - Mockito.mock(InstanceConnectionInfo.class), - null, - 1)), + resourceManager.tell(new NotifyResourceStarted(resourceID1), fakeJobManager); + expectMsgClass(Acknowledge.class); + // Send task manager registration - resourceManager.tell(new RegisterResource( - ActorRef.noSender(), - new RegistrationMessages.RegisterTaskManager(resourceID2, - Mockito.mock(InstanceConnectionInfo.class), - null, - 1)), + resourceManager.tell(new NotifyResourceStarted(resourceID2), fakeJobManager); - expectMsgClass(RegisterResourceSuccessful.class); - expectMsgClass(RegisterResourceSuccessful.class); + expectMsgClass(Acknowledge.class); // check for number registration of registered resources resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), fakeJobManager); http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index 685fa9a..e23aba7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -50,6 +50,7 @@ import scala.Option; import scala.Some; import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; import java.io.IOException; @@ -286,14 +287,18 @@ public class TaskManagerRegistrationTest extends TestLogger { jm = TestingUtils.createForwardingActor(actorSystem, getTestActor(), Option.<String>empty()); final ActorGateway jmGateway = jm; + FiniteDuration refusedRegistrationPause = new FiniteDuration(500, TimeUnit.MILLISECONDS); + Configuration tmConfig = new Configuration(config); + tmConfig.setString(ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, refusedRegistrationPause.toString()); + // we make the test actor (the test kit) the JobManager to intercept // the messages taskManager = createTaskManager( - actorSystem, - jmGateway, - config, - true, - false); + actorSystem, + jmGateway, + tmConfig, + true, + false); final ActorGateway taskManagerGateway = taskManager; @@ -312,8 +317,10 @@ public class TaskManagerRegistrationTest extends TestLogger { } }; + + // the TaskManager should wait a bit an retry... - FiniteDuration maxDelay = (FiniteDuration) TaskManager.DELAY_AFTER_REFUSED_REGISTRATION().$times(2.0); + FiniteDuration maxDelay = (FiniteDuration) refusedRegistrationPause.$times(3.0); new Within(maxDelay) { @Override @@ -333,6 +340,94 @@ public class TaskManagerRegistrationTest extends TestLogger { } /** + * Tests that the TaskManager does not send an excessive amount of registration messages to + * the job manager if its registration was rejected. + */ + @Test + public void testTaskManagerNoExcessiveRegistrationMessages() throws Exception { + new JavaTestKit(actorSystem) {{ + ActorGateway jm = null; + ActorGateway taskManager =null; + try { + FiniteDuration timeout = new FiniteDuration(5, TimeUnit.SECONDS); + + jm = TestingUtils.createForwardingActor(actorSystem, getTestActor(), Option.<String>empty()); + final ActorGateway jmGateway = jm; + + long refusedRegistrationPause = 500; + long initialRegistrationPause = 100; + long maxDelay = 30000; + + Configuration tmConfig = new Configuration(config); + tmConfig.setString(ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, refusedRegistrationPause + " ms"); + tmConfig.setString(ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, initialRegistrationPause + " ms"); + + // we make the test actor (the test kit) the JobManager to intercept + // the messages + taskManager = createTaskManager( + actorSystem, + jmGateway, + tmConfig, + true, + false); + + final ActorGateway taskManagerGateway = taskManager; + + final Deadline deadline = timeout.fromNow(); + + try { + while (deadline.hasTimeLeft()) { + // the TaskManager should try to register + expectMsgClass(deadline.timeLeft(), RegisterTaskManager.class); + + // we decline the registration + taskManagerGateway.tell( + new RefuseRegistration(new Exception("test reason")), + jmGateway); + } + } catch (AssertionError error) { + // ignore since it simply means that we have used up all our time + } + + RegisterTaskManager[] registerTaskManagerMessages = new ReceiveWhile<RegisterTaskManager>(RegisterTaskManager.class, timeout) { + @Override + protected RegisterTaskManager match(Object msg) throws Exception { + if (msg instanceof RegisterTaskManager) { + return (RegisterTaskManager) msg; + } else { + throw noMatch(); + } + } + }.get(); + + int maxExponent = (int) Math.floor(Math.log(((double) maxDelay / initialRegistrationPause + 1))/Math.log(2)); + int exponent = (int) Math.ceil(Math.log(((double) timeout.toMillis() / initialRegistrationPause + 1))/Math.log(2)); + + int exp = Math.min(maxExponent, exponent); + + long difference = timeout.toMillis() - (initialRegistrationPause * (1 << exp)); + + int numberRegisterTaskManagerMessages = exp; + + if (difference > 0) { + numberRegisterTaskManagerMessages += Math.ceil((double) difference / maxDelay); + } + + int maxExpectedNumberOfRegisterTaskManagerMessages = numberRegisterTaskManagerMessages * 2; + + assertTrue("The number of RegisterTaskManager messages #" + + registerTaskManagerMessages.length + + " should be less than #" + + maxExpectedNumberOfRegisterTaskManagerMessages, + registerTaskManagerMessages.length <= maxExpectedNumberOfRegisterTaskManagerMessages); + } finally { + stopActor(taskManager); + stopActor(jm); + } + }}; + } + + /** * Validate that the TaskManager attempts to re-connect after it lost the connection * to the JobManager. */ http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml index dcb5334..94f9348 100644 --- a/flink-yarn-tests/pom.xml +++ b/flink-yarn-tests/pom.xml @@ -65,6 +65,14 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-yarn_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>${shading-artifact.name}</artifactId> <version>${project.version}</version> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java index 3ed0dc1..a3337bb 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java @@ -43,8 +43,13 @@ public class TestingYarnClusterDescriptor extends AbstractYarnClusterDescriptor Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-runtime tests " + "jar. Make sure to package the flink-runtime module."); + File testingYarnJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn")); + Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-yarn tests " + + "jar. Make sure to package the flink-yarn module."); + filesToShip.add(testingJar); filesToShip.add(testingRuntimeJar); + filesToShip.add(testingYarnJar); addShipFiles(filesToShip); } http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java deleted file mode 100644 index 5a61b8f..0000000 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.conf.YarnConfiguration; - -/** - * Flink's testing resource manager for Yarn. - */ -public class TestingYarnFlinkResourceManager extends YarnFlinkResourceManager { - - public TestingYarnFlinkResourceManager( - Configuration flinkConfig, - YarnConfiguration yarnConfig, - LeaderRetrievalService leaderRetrievalService, - String applicationMasterHostName, - String webInterfaceURL, - ContaineredTaskManagerParameters taskManagerParameters, - ContainerLaunchContext taskManagerLaunchContext, - int yarnHeartbeatIntervalMillis, - int maxFailedContainers, - int numInitialTaskManagers) { - - super( - flinkConfig, - yarnConfig, - leaderRetrievalService, - applicationMasterHostName, - webInterfaceURL, - taskManagerParameters, - taskManagerLaunchContext, - yarnHeartbeatIntervalMillis, - maxFailedContainers, - numInitialTaskManagers); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/pom.xml ---------------------------------------------------------------------- diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml index 12db578..b770c63 100644 --- a/flink-yarn/pom.xml +++ b/flink-yarn/pom.xml @@ -43,7 +43,7 @@ under the License. </exclusion> </exclusions> </dependency> - + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.10</artifactId> @@ -64,7 +64,6 @@ under the License. <scope>test</scope> </dependency> - <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_${scala.binary.version}</artifactId> @@ -85,8 +84,20 @@ under the License. <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> - - + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-testkit_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -197,6 +208,18 @@ under the License. <configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation> </configuration> </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java index 974c4df..cb2f40a 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java @@ -30,7 +30,6 @@ import static java.util.Objects.requireNonNull; */ public class RegisteredYarnWorkerNode implements ResourceIDRetrievable { - /** The container on which the worker runs */ private final Container yarnContainer; http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java index 883a860..3c85795 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameter import org.apache.flink.runtime.clusterframework.messages.StopCluster; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.util.Preconditions; import org.apache.flink.yarn.messages.ContainersAllocated; import org.apache.flink.yarn.messages.ContainersComplete; @@ -122,18 +123,75 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar private RegisterApplicationMasterResponseReflector applicationMasterResponseReflector = new RegisterApplicationMasterResponseReflector(LOG); + public YarnFlinkResourceManager( + Configuration flinkConfig, + YarnConfiguration yarnConfig, + LeaderRetrievalService leaderRetrievalService, + String applicationMasterHostName, + String webInterfaceURL, + ContaineredTaskManagerParameters taskManagerParameters, + ContainerLaunchContext taskManagerLaunchContext, + int yarnHeartbeatIntervalMillis, + int maxFailedContainers, + int numInitialTaskManagers) { + + this( + flinkConfig, + yarnConfig, + leaderRetrievalService, + applicationMasterHostName, + webInterfaceURL, + taskManagerParameters, + taskManagerLaunchContext, + yarnHeartbeatIntervalMillis, + maxFailedContainers, + numInitialTaskManagers, + new YarnResourceManagerCallbackHandler()); + } public YarnFlinkResourceManager( - Configuration flinkConfig, - YarnConfiguration yarnConfig, - LeaderRetrievalService leaderRetrievalService, - String applicationMasterHostName, - String webInterfaceURL, - ContaineredTaskManagerParameters taskManagerParameters, - ContainerLaunchContext taskManagerLaunchContext, - int yarnHeartbeatIntervalMillis, - int maxFailedContainers, - int numInitialTaskManagers) { + Configuration flinkConfig, + YarnConfiguration yarnConfig, + LeaderRetrievalService leaderRetrievalService, + String applicationMasterHostName, + String webInterfaceURL, + ContaineredTaskManagerParameters taskManagerParameters, + ContainerLaunchContext taskManagerLaunchContext, + int yarnHeartbeatIntervalMillis, + int maxFailedContainers, + int numInitialTaskManagers, + YarnResourceManagerCallbackHandler callbackHandler) { + + this( + flinkConfig, + yarnConfig, + leaderRetrievalService, + applicationMasterHostName, + webInterfaceURL, + taskManagerParameters, + taskManagerLaunchContext, + yarnHeartbeatIntervalMillis, + maxFailedContainers, + numInitialTaskManagers, + callbackHandler, + AMRMClientAsync.createAMRMClientAsync(yarnHeartbeatIntervalMillis, callbackHandler), + NMClient.createNMClient()); + } + + public YarnFlinkResourceManager( + Configuration flinkConfig, + YarnConfiguration yarnConfig, + LeaderRetrievalService leaderRetrievalService, + String applicationMasterHostName, + String webInterfaceURL, + ContaineredTaskManagerParameters taskManagerParameters, + ContainerLaunchContext taskManagerLaunchContext, + int yarnHeartbeatIntervalMillis, + int maxFailedContainers, + int numInitialTaskManagers, + YarnResourceManagerCallbackHandler callbackHandler, + AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient, + NMClient nodeManagerClient) { super(numInitialTaskManagers, flinkConfig, leaderRetrievalService); @@ -145,6 +203,10 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar this.yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMillis; this.maxFailedContainers = maxFailedContainers; + this.resourceManagerCallbackHandler = Preconditions.checkNotNull(callbackHandler); + this.resourceManagerClient = Preconditions.checkNotNull(resourceManagerClient); + this.nodeManagerClient = Preconditions.checkNotNull(nodeManagerClient); + this.containersInLaunch = new HashMap<>(); this.containersBeingReturned = new HashMap<>(); } @@ -178,16 +240,12 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar protected void initialize() throws Exception { LOG.info("Initializing YARN resource master"); - // create the client to communicate with the ResourceManager - resourceManagerCallbackHandler = new YarnResourceManagerCallbackHandler(self()); + resourceManagerCallbackHandler.initialize(self()); - resourceManagerClient = AMRMClientAsync.createAMRMClientAsync( - yarnHeartbeatIntervalMillis, resourceManagerCallbackHandler); resourceManagerClient.init(yarnConfig); resourceManagerClient.start(); // create the client to communicate with the node managers - nodeManagerClient = NMClient.createNMClient(); nodeManagerClient.init(yarnConfig); nodeManagerClient.start(); nodeManagerClient.cleanupRunningContainersOnStop(true); @@ -277,7 +335,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar Priority priority = Priority.newInstance(0); // Resource requirements for worker containers - int taskManagerSlots = Integer.valueOf(System.getenv(YarnConfigKeys.ENV_SLOTS)); + int taskManagerSlots = taskManagerParameters.numSlots(); int vcores = config.getInteger(ConfigConstants.YARN_VCORES, Math.max(taskManagerSlots, 1)); Resource capability = Resource.newInstance(containerMemorySizeMB, vcores); @@ -300,7 +358,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar } @Override - protected void releaseRegisteredWorker(RegisteredYarnWorkerNode worker) { + protected void releaseStartedWorker(RegisteredYarnWorkerNode worker) { releaseYarnContainer(worker.yarnContainer()); } @@ -323,10 +381,13 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar } @Override - protected RegisteredYarnWorkerNode workerRegistered(ResourceID resourceID) throws Exception { + protected RegisteredYarnWorkerNode workerStarted(ResourceID resourceID) { YarnContainerInLaunch inLaunch = containersInLaunch.remove(resourceID); if (inLaunch == null) { - throw new Exception("Cannot register Worker - unknown resource id " + resourceID); + // Container was not in state "being launched", this can indicate that the TaskManager + // in this container was already registered or that the container was not started + // by this resource manager. Simply ignore this resourceID. + return null; } else { return new RegisteredYarnWorkerNode(inLaunch.container()); } @@ -345,8 +406,12 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar accepted.add(new RegisteredYarnWorkerNode(yci.container())); } else { - LOG.info("YARN container consolidation does not recognize TaskManager {}", - resourceID); + if (isStarted(resourceID)) { + LOG.info("TaskManager {} has already been registered at the resource manager.", resourceID); + } else { + LOG.info("YARN container consolidation does not recognize TaskManager {}", + resourceID); + } } } return accepted; @@ -368,7 +433,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar private void containersAllocated(List<Container> containers) { final int numRequired = getDesignatedWorkerPoolSize(); - final int numRegistered = getNumberOfRegisteredTaskManagers(); + final int numRegistered = getNumberOfStartedTaskManagers(); for (Container container : containers) { numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); @@ -519,7 +584,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar private void updateProgress() { final int required = getDesignatedWorkerPoolSize(); - final int available = getNumberOfRegisteredTaskManagers() + containersInLaunch.size(); + final int available = getNumberOfStartedTaskManagers() + containersInLaunch.size(); final float progress = (required <= 0) ? 1.0f : available / (float) required; if (resourceManagerCallbackHandler != null) { @@ -583,7 +648,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar * @return A list with containers from previous application attempt. */ private List<Container> getContainersFromPreviousAttempts(RegisterApplicationMasterResponse response) { - if (method != null) { + if (method != null && response != null) { try { @SuppressWarnings("unchecked") List<Container> list = (List<Container>) method.invoke(response); http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java index 1e287c2..2372cbc 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerCallbackHandler.java @@ -42,12 +42,19 @@ public class YarnResourceManagerCallbackHandler implements AMRMClientAsync.Callb /** The progress we report */ private float currentProgress; - + + public YarnResourceManagerCallbackHandler() { + this(null); + } public YarnResourceManagerCallbackHandler(ActorRef yarnFrameworkMaster) { this.yarnFrameworkMaster = yarnFrameworkMaster; } + public void initialize(ActorRef yarnFrameworkMaster) { + this.yarnFrameworkMaster = yarnFrameworkMaster; + } + /** * Sets the current progress. * @param progress The current progress fraction. @@ -65,16 +72,20 @@ public class YarnResourceManagerCallbackHandler implements AMRMClientAsync.Callb @Override public void onContainersCompleted(List<ContainerStatus> list) { - yarnFrameworkMaster.tell( - new ContainersComplete(list), - ActorRef.noSender()); + if (yarnFrameworkMaster != null) { + yarnFrameworkMaster.tell( + new ContainersComplete(list), + ActorRef.noSender()); + } } @Override public void onContainersAllocated(List<Container> containers) { - yarnFrameworkMaster.tell( - new ContainersAllocated(containers), - ActorRef.noSender()); + if (yarnFrameworkMaster != null) { + yarnFrameworkMaster.tell( + new ContainersAllocated(containers), + ActorRef.noSender()); + } } @Override @@ -89,8 +100,10 @@ public class YarnResourceManagerCallbackHandler implements AMRMClientAsync.Callb @Override public void onError(Throwable error) { - yarnFrameworkMaster.tell( - new FatalErrorOccurred("Connection to YARN Resource Manager failed", error), - ActorRef.noSender()); + if (yarnFrameworkMaster != null) { + yarnFrameworkMaster.tell( + new FatalErrorOccurred("Connection to YARN Resource Manager failed", error), + ActorRef.noSender()); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java b/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java new file mode 100644 index 0000000..f03c604 --- /dev/null +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorRef; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered; +import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import java.util.Comparator; +import java.util.PriorityQueue; + +public class TestingYarnFlinkResourceManager extends YarnFlinkResourceManager { + + private final PriorityQueue<Tuple2<Integer, ActorRef>> waitingQueue = new PriorityQueue<>(32, new Comparator<Tuple2<Integer, ActorRef>>() { + @Override + public int compare(Tuple2<Integer, ActorRef> o1, Tuple2<Integer, ActorRef> o2) { + return o1.f0 - o2.f0; + } + }); + + public TestingYarnFlinkResourceManager( + Configuration flinkConfig, + YarnConfiguration yarnConfig, + LeaderRetrievalService leaderRetrievalService, + String applicationMasterHostName, + String webInterfaceURL, + ContaineredTaskManagerParameters taskManagerParameters, + ContainerLaunchContext taskManagerLaunchContext, + int yarnHeartbeatIntervalMillis, + int maxFailedContainers, + int numInitialTaskManagers) { + + super(flinkConfig, + yarnConfig, + leaderRetrievalService, + applicationMasterHostName, + webInterfaceURL, + taskManagerParameters, + taskManagerLaunchContext, + yarnHeartbeatIntervalMillis, + maxFailedContainers, + numInitialTaskManagers); + } + + public TestingYarnFlinkResourceManager( + Configuration flinkConfig, + YarnConfiguration yarnConfig, + LeaderRetrievalService leaderRetrievalService, + String applicationMasterHostName, + String webInterfaceURL, + ContaineredTaskManagerParameters taskManagerParameters, + ContainerLaunchContext taskManagerLaunchContext, + int yarnHeartbeatIntervalMillis, + int maxFailedContainers, + int numInitialTaskManagers, + YarnResourceManagerCallbackHandler callbackHandler, + AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient, + NMClient nodeManagerClient) { + super(flinkConfig, yarnConfig, leaderRetrievalService, applicationMasterHostName, webInterfaceURL, taskManagerParameters, taskManagerLaunchContext, yarnHeartbeatIntervalMillis, maxFailedContainers, numInitialTaskManagers, callbackHandler, resourceManagerClient, nodeManagerClient); + } + + @Override + protected void handleMessage(Object message) { + if (message instanceof RequestNumberOfRegisteredResources) { + getSender().tell(getNumberOfStartedTaskManagers(), getSelf()); + } else if (message instanceof NotifyWhenResourcesRegistered) { + NotifyWhenResourcesRegistered notifyMessage = (NotifyWhenResourcesRegistered) message; + + if (getNumberOfStartedTaskManagers() >= notifyMessage.getNumberResources()) { + getSender().tell(true, getSelf()); + } else { + waitingQueue.offer(Tuple2.of(notifyMessage.getNumberResources(), getSender())); + } + } else if (message instanceof NotifyResourceStarted) { + super.handleMessage(message); + + while (!waitingQueue.isEmpty() && waitingQueue.peek().f0 <= getNumberOfStartedTaskManagers()) { + ActorRef receiver = waitingQueue.poll().f1; + receiver.tell(true, getSelf()); + } + } else { + super.handleMessage(message); + } + } +}
