Repository: flink Updated Branches: refs/heads/master a5150a90c -> 517289dc5
[FLINK-1352] [runtime] Fix buggy registration of TaskManager to JobManager by introducing dedicated RefusedRegistration messages Adds exponential backoff strategy for TaskManager registration. Introduces AlreadyRegistered and RefuseRegistration messages. This closes #328. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/730e056a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/730e056a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/730e056a Branch: refs/heads/master Commit: 730e056a2a2ea028495637b633396392c31337e3 Parents: a5150a9 Author: Till Rohrmann <[email protected]> Authored: Wed Jan 21 12:45:12 2015 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Jan 27 14:33:27 2015 +0100 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 10 ++ .../flink/runtime/instance/InstanceManager.java | 4 + .../flink/runtime/jobmanager/JobManager.scala | 14 +- .../runtime/messages/RegistrationMessages.scala | 16 +++ .../flink/runtime/taskmanager/TaskManager.scala | 139 +++++++++++++------ .../taskmanager/TaskManagerConfiguration.scala | 4 +- .../TaskManagerRegistrationITCase.scala | 107 ++++++++++++++ .../runtime/testingUtils/TestingUtils.scala | 20 ++- .../apache/flink/yarn/ApplicationMaster.scala | 4 + 9 files changed, 266 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/730e056a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 969329e..a2f2c83 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -170,6 +170,11 @@ public final class ConfigConstants { public static final String TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS = "taskmanager.debug.memory.logIntervalMs"; /** + * + */ + public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = "taskmanager.maxRegistrationDuration"; + + /** * Parameter for the maximum fan for out-of-core algorithms. * Corresponds to the maximum fan-in for merge-sorts and the maximum fan-out * for hybrid hash joins. @@ -488,6 +493,11 @@ public final class ConfigConstants { * The default number of task slots per task manager */ public static final int DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS = -1; + + /** + * The default task manager's maximum registration duration + */ + public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf"; /** * The default value for the JobClient's polling interval. 2 Seconds. http://git-wip-us.apache.org/repos/asf/flink/blob/730e056a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java index 3ce3ac7..64a761b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java @@ -226,6 +226,10 @@ public class InstanceManager { return new HashSet<Instance>(registeredHostsById.values()); } } + + public Instance getRegisteredInstance(ActorRef ref) { + return registeredHostsByConnection.get(ref); + } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/730e056a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 87c9745..c4bb793 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -122,10 +122,16 @@ class JobManager(val configuration: Configuration) val instanceID = instanceManager.registerTaskManager(taskManager, connectionInfo, hardwareInformation, numberOfSlots) - // to be notified when the taskManager is no longer reachable - context.watch(taskManager) - - taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort) + // TaskManager is already registered + if(instanceID == null){ + val instanceID = instanceManager.getRegisteredInstance(taskManager).getId + taskManager ! AlreadyRegistered(instanceID, libraryCacheManager.getBlobServerPort) + } else { + // to be notified when the taskManager is no longer reachable + context.watch(taskManager) + + taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort) + } } case RequestNumberRegisteredTaskManager => { http://git-wip-us.apache.org/repos/asf/flink/blob/730e056a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala index 3a556dd..8d30741 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala @@ -43,4 +43,20 @@ object RegistrationMessages { */ case class AcknowledgeRegistration(instanceID: InstanceID, blobPort: Int) + /** + * Denotes that the TaskManager has already been registered at the JobManager. + * + * @param instanceID + * @param blobPort + */ + case class AlreadyRegistered(instanceID: InstanceID, blobPort: Int) + + /** + * Denotes the unsuccessful registration of a task manager at the job manager. This is the + * response triggered by the [[RegisterTaskManager]] message. + * + * @param reason Reason why the task manager registration was refused + */ + case class RefuseRegistration(reason: String) + } http://git-wip-us.apache.org/repos/asf/flink/blob/730e056a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 0e953c2..17bc0ac 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -46,7 +46,8 @@ import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobID} import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.memorymanager.DefaultMemoryManager import org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState -import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, RegisterTaskManager} +import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered, +RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager} import org.apache.flink.runtime.messages.TaskManagerMessages._ import org.apache.flink.runtime.messages.TaskManagerProfilerMessages.{MonitorTask, RegisterProfilingListener, UnmonitorTask} import org.apache.flink.runtime.net.NetUtils @@ -90,11 +91,11 @@ import scala.collection.JavaConverters._ log.info("Creating {} task slot(s).", numberOfSlots) log.info("TaskManager connection information {}.", connectionInfo) - val REGISTRATION_DELAY = 0 seconds - val REGISTRATION_INTERVAL = 10 seconds - val MAX_REGISTRATION_ATTEMPTS = 10 val HEARTBEAT_INTERVAL = 5000 millisecond + var registrationDelay = 50 milliseconds + var registrationDuration = 0 seconds + TaskManager.checkTempDirs(tmpDirPaths) val ioManager = new IOManagerAsync(tmpDirPaths) val memoryManager = new DefaultMemoryManager(memorySize, numberOfSlots, pageSize) @@ -121,7 +122,6 @@ import scala.collection.JavaConverters._ var libraryCacheManager: LibraryCacheManager = null var networkEnvironment: Option[NetworkEnvironment] = None - var registrationScheduler: Option[Cancellable] = None var registrationAttempts: Int = 0 var registered: Boolean = false var currentJobManager = ActorRef.noSender @@ -175,62 +175,79 @@ import scala.collection.JavaConverters._ } private def tryJobManagerRegistration(): Unit = { - registrationAttempts = 0 - import context.dispatcher - registrationScheduler = Some(context.system.scheduler.schedule( - TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL, - self, RegisterAtJobManager)) + registrationDuration = 0 seconds + + registered = false + + context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) } override def receiveWithLogMessages: Receive = { case RegisterAtJobManager => { - registrationAttempts += 1 + if(!registered) { + registrationDuration += registrationDelay + // double delay for exponential backoff + registrationDelay *= 2 - if (registered) { - registrationScheduler.foreach(_.cancel()) - } - else if (registrationAttempts <= TaskManager.MAX_REGISTRATION_ATTEMPTS) { + if (registrationDuration > maxRegistrationDuration) { + log.warning("TaskManager could not register at JobManager {} after {}.", jobManagerAkkaURL, - log.info("Try to register at master {}. Attempt #{}", jobManagerAkkaURL, - registrationAttempts) - val jobManager = context.actorSelection(jobManagerAkkaURL) + maxRegistrationDuration) - jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) - } - else { - log.error("TaskManager could not register at JobManager."); - self ! PoisonPill + self ! PoisonPill + } else if (!registered) { + log.info(s"Try to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. " + + s"Attempt") + val jobManager = context.actorSelection(jobManagerAkkaURL) + + jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) + + context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) + } } } case AcknowledgeRegistration(id, blobPort) => { - if (!registered) { + if(!registered) { + finishRegistration(id, blobPort) registered = true - currentJobManager = sender - instanceID = id - - context.watch(currentJobManager) - - log.info("TaskManager successfully registered at JobManager {}.", - currentJobManager.path.toString) - - setupNetworkEnvironment() - setupLibraryCacheManager(blobPort) + } else { + if (log.isDebugEnabled) { + log.debug("The TaskManager {} is already registered at the JobManager {}, but received " + + "another AcknowledgeRegistration message.", self.path, currentJobManager.path) + } + } + } - heartbeatScheduler = Some(context.system.scheduler.schedule( - TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat)) + case AlreadyRegistered(id, blobPort) => + if(!registered) { + log.warning("The TaskManager {} seems to be already registered at the JobManager {} even" + + "though it has not yet finished the registration process.", self.path, sender.path) - profiler foreach { - _.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager)) + finishRegistration(id, blobPort) + registered = true + } else { + // ignore AlreadyRegistered messages which arrived after AcknowledgeRegistration + if(log.isDebugEnabled){ + log.debug("The TaskManager {} has already been registered at the JobManager {}.", + self.path, sender.path) } + } - for (listener <- waitForRegistration) { - listener ! RegisteredAtJobManager - } + case RefuseRegistration(reason) => + if(!registered) { + log.error("The registration of task manager {} was refused by the job manager {} " + + "because {}.", self.path, jobManagerAkkaURL, reason) - waitForRegistration.clear() + // Shut task manager down + self ! PoisonPill + } else { + // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration + if(log.isDebugEnabled) { + log.debug("Received RefuseRegistration from the JobManager even though being already " + + "registered") + } } - } case SubmitTask(tdd) => { submitTask(tdd) @@ -454,7 +471,34 @@ import scala.collection.JavaConverters._ } } - def setupNetworkEnvironment(): Unit = { + private def finishRegistration(id: InstanceID, blobPort: Int): Unit = { + currentJobManager = sender + instanceID = id + + context.watch(currentJobManager) + + log.info(s"TaskManager successfully registered at JobManager ${ + currentJobManager.path.toString + }.") + + setupNetworkEnvironment() + setupLibraryCacheManager(blobPort) + + heartbeatScheduler = Some(context.system.scheduler.schedule( + TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat)) + + profiler foreach { + _.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager)) + } + + for (listener <- waitForRegistration) { + listener ! RegisteredAtJobManager + } + + waitForRegistration.clear() + } + + private def setupNetworkEnvironment(): Unit = { //shutdown existing network environment networkEnvironment foreach { ne => @@ -730,8 +774,13 @@ object TaskManager { val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS) + val maxRegistrationDuration = Duration(configuration.getString( + ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, + ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION)) + val taskManagerConfig = TaskManagerConfiguration(numberOfSlots, memorySize, pageSize, - tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval, timeout) + tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval, timeout, + maxRegistrationDuration) (connectionInfo, jobManagerURL, taskManagerConfig, networkConfig) } http://git-wip-us.apache.org/repos/asf/flink/blob/730e056a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala index a6a76a3..82cbe9e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala @@ -18,10 +18,10 @@ package org.apache.flink.runtime.taskmanager -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{Duration, FiniteDuration} case class TaskManagerConfiguration(numberOfSlots: Int, memorySize: Long, pageSize: Int, tmpDirPaths: Array[String], cleanupInterval: Long, memoryLogggingIntervalMs: Option[Long], profilingInterval: Option[Long], - timeout: FiniteDuration) + timeout: FiniteDuration, maxRegistrationDuration: Duration) http://git-wip-us.apache.org/repos/asf/flink/blob/730e056a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala new file mode 100644 index 0000000..e4c1efb --- /dev/null +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager + +import java.net.InetAddress + +import akka.actor._ +import akka.testkit.{TestKit, ImplicitSender} +import org.apache.flink.runtime.instance.{InstanceID, HardwareDescription, InstanceConnectionInfo} +import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered, +RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager} +import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat +import org.apache.flink.runtime.testingUtils.TestingUtils +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + +class TaskManagerRegistrationITCase(_system: ActorSystem) extends TestKit(_system) with +ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { + + def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) + + override def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + } + + "The JobManager" should { + "notify already registered TaskManagers" in { + + val jm = TestingUtils.startTestingJobManager + + val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1) + val hardwareDescription = HardwareDescription.extractFromSystem(10) + + try { + within(TestingUtils.TESTING_DURATION) { + jm ! RegisterTaskManager(connectionInfo, hardwareDescription, 1) + jm ! RegisterTaskManager(connectionInfo, hardwareDescription, 1) + + expectMsgType[AcknowledgeRegistration] + expectMsgType[AlreadyRegistered] + } + } finally { + jm ! Kill + } + } + } + + "The TaskManager" should { + "shutdown if its registration is refused by the JobManager" in { + + val tm = TestingUtils.startTestingTaskManager(self) + + watch(tm) + + try{ + within(TestingUtils.TESTING_DURATION) { + expectMsgType[RegisterTaskManager] + tm ! RefuseRegistration("Testing connection refusal") + + expectTerminated(tm) + } + } + } + + "ignore RefuseRegistration messages after it has been successfully registered" in { + + val tm = TestingUtils.startTestingTaskManager(self) + + try { + within(TestingUtils.TESTING_DURATION) { + expectMsgType[RegisterTaskManager] + + tm ! AcknowledgeRegistration(new InstanceID(), 42) + + tm ! RefuseRegistration("Should be ignored") + + // Check if the TaskManager is still alive + tm ! Identify + + expectMsgPF() { + // wait for actor identity + case x: ActorIdentity => true + // ignore heartbeats + case h: Heartbeat => false + } + } + } finally { + tm ! Kill + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/730e056a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 014a9ed..b658279 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -18,7 +18,7 @@ package org.apache.flink.runtime.testingUtils -import akka.actor.{Props, ActorSystem} +import akka.actor.{ActorRef, Props, ActorSystem} import akka.testkit.CallingThreadDispatcher import com.typesafe.config.ConfigFactory import org.apache.flink.configuration.{ConfigConstants, Configuration} @@ -26,6 +26,7 @@ import org.apache.flink.core.io.IOReadableWritable import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.akka.serialization.IOReadableWritableSerializer import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ActionQueue +import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.minicluster.FlinkMiniCluster import org.apache.flink.runtime.taskmanager.TaskManager import scala.concurrent.duration._ @@ -98,6 +99,23 @@ object TestingUtils { networkConnectionConfig) with TestingTaskManager)) } + def startTestingJobManager(implicit system: ActorSystem): ActorRef = { + val config = new Configuration() + + system.actorOf(Props(new JobManager(config) with TestingJobManager)) + } + + def startTestingTaskManager(jobManager: ActorRef)(implicit system: ActorSystem): ActorRef = { + val jmURL = jobManager.path.toString + val config = new Configuration() + config.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, jmURL) + val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) = + TaskManager.parseConfiguration("LOCALHOST", config) + + system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig, + networkConnectionConfig) with TestingTaskManager)) + } + def startTestingCluster(numSlots: Int, numTMs: Int = 1, timeout: Int = DEFAULT_AKKA_ASK_TIMEOUT): FlinkMiniCluster = { val config = new Configuration() http://git-wip-us.apache.org/repos/asf/flink/blob/730e056a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala index 37ae5ed..7c72ef4 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala @@ -39,6 +39,7 @@ object ApplicationMaster { val CONF_FILE = "flink-conf.yaml" val MODIFIED_CONF_FILE = "flink-conf-modified.yaml" + val MAX_REGISTRATION_DURATION = "5 minutes" def main(args: Array[String]): Unit ={ val yarnClientUsername = System.getenv(FlinkYarnClient.ENV_CLIENT_USERNAME) @@ -148,6 +149,9 @@ object ApplicationMaster { s"${ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY}: ${slots*taskManagerCount}") } + output.println(s"${ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION}: " + + s"$MAX_REGISTRATION_DURATION") + // add dynamic properties val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
