[FLINK-1984] port Mesos code to latest master - move Scala code to /scala dir - remove merge commits - update version
This closes #2315 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/842e3e7d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/842e3e7d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/842e3e7d Branch: refs/heads/master Commit: 842e3e7d13c821fccd599642a417b6328915a366 Parents: 38a9534 Author: Maximilian Michels <[email protected]> Authored: Mon Aug 29 17:08:01 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Mon Aug 29 17:32:51 2016 +0200 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 7 +- flink-mesos/pom.xml | 2 +- .../MesosApplicationMasterRunner.java | 8 +- .../store/ZooKeeperMesosWorkerStore.java | 2 +- .../ContaineredJobManager.scala | 172 ------------------- .../ContaineredJobManager.scala | 172 +++++++++++++++++++ 6 files changed, 182 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/842e3e7d/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 2fe27e0..f0f1b6b 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -742,6 +742,10 @@ public final class ConfigConstants { @PublicEvolving public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "high-availability.zookeeper.path.checkpoint-counter"; + /** ZooKeeper root path (ZNode) for Mesos workers. */ + @PublicEvolving + public static final String HA_ZOOKEEPER_MESOS_WORKERS_PATH = "recovery.zookeeper.path.mesos-workers"; + @PublicEvolving public static final String HA_ZOOKEEPER_SESSION_TIMEOUT = "high-availability.zookeeper.client.session-timeout"; @@ -790,9 +794,6 @@ public final class ConfigConstants { @Deprecated public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "recovery.zookeeper.path.checkpoint-counter"; - /** ZooKeeper root path (ZNode) for Mesos workers. */ - public static final String ZOOKEEPER_MESOS_WORKERS_PATH = "recovery.zookeeper.path.mesos-workers"; - /** Deprecated in favour of {@link #HA_ZOOKEEPER_SESSION_TIMEOUT}. */ @Deprecated public static final String ZOOKEEPER_SESSION_TIMEOUT = "recovery.zookeeper.client.session-timeout"; http://git-wip-us.apache.org/repos/asf/flink/blob/842e3e7d/flink-mesos/pom.xml ---------------------------------------------------------------------- diff --git a/flink-mesos/pom.xml b/flink-mesos/pom.xml index 43b3195..a6edc0b 100644 --- a/flink-mesos/pom.xml +++ b/flink-mesos/pom.xml @@ -23,7 +23,7 @@ under the License. <parent> <groupId>org.apache.flink</groupId> <artifactId>flink-parent</artifactId> - <version>1.1-SNAPSHOT</version> + <version>1.2-SNAPSHOT</version> <relativePath>..</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/flink/blob/842e3e7d/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index c13cdf9..9916a87 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -38,9 +38,9 @@ import org.apache.flink.mesos.util.ZooKeeperUtils; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; -import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.process.ProcessReaper; import org.apache.flink.runtime.taskmanager.TaskManager; @@ -481,11 +481,11 @@ public class MesosApplicationMasterRunner { private static MesosWorkerStore createWorkerStore(Configuration flinkConfig) throws Exception { MesosWorkerStore workerStore; - RecoveryMode recoveryMode = RecoveryMode.fromConfig(flinkConfig); - if (recoveryMode == RecoveryMode.STANDALONE) { + HighAvailabilityMode recoveryMode = HighAvailabilityMode.fromConfig(flinkConfig); + if (recoveryMode == HighAvailabilityMode.NONE) { workerStore = new StandaloneMesosWorkerStore(); } - else if (recoveryMode == RecoveryMode.ZOOKEEPER) { + else if (recoveryMode == HighAvailabilityMode.ZOOKEEPER) { // note: the store is responsible for closing the client. CuratorFramework client = ZooKeeperUtils.startCuratorFramework(flinkConfig); workerStore = ZooKeeperMesosWorkerStore.createMesosWorkerStore(client, flinkConfig); http://git-wip-us.apache.org/repos/asf/flink/blob/842e3e7d/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java index 45553d4..c5cef8e 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java @@ -292,7 +292,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { ZooKeeperUtils.createFileSystemStateStorage(configuration, "mesosWorkerStore"); String zooKeeperMesosWorkerStorePath = configuration.getString( - ConfigConstants.ZOOKEEPER_MESOS_WORKERS_PATH, + ConfigConstants.HA_ZOOKEEPER_MESOS_WORKERS_PATH, ConfigConstants.DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH ); http://git-wip-us.apache.org/repos/asf/flink/blob/842e3e7d/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala deleted file mode 100644 index 45b404a..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.clusterframework - -import java.util.concurrent.ExecutorService - -import akka.actor.ActorRef -import org.apache.flink.api.common.JobID -import org.apache.flink.configuration.{Configuration => FlinkConfiguration} -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory -import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore -import org.apache.flink.runtime.clusterframework.messages._ -import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager -import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory -import org.apache.flink.runtime.instance.InstanceManager -import org.apache.flink.runtime.jobgraph.JobStatus -import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} -import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore} -import org.apache.flink.runtime.leaderelection.LeaderElectionService -import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, JobNotFound, RequestJobStatus} -import org.apache.flink.runtime.messages.Messages.Acknowledge -import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} - -import scala.concurrent.duration._ -import scala.language.postfixOps - - -/** JobManager actor for execution on Yarn or Mesos. It enriches the [[JobManager]] with additional messages - * to start/administer/stop the session. - * - * @param flinkConfiguration Configuration object for the actor - * @param executorService Execution context which is used to execute concurrent tasks in the - * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] - * @param instanceManager Instance manager to manage the registered - * [[org.apache.flink.runtime.taskmanager.TaskManager]] - * @param scheduler Scheduler to schedule Flink jobs - * @param libraryCacheManager Manager to manage uploaded jar files - * @param archive Archive for finished Flink jobs - * @param restartStrategyFactory Restart strategy to be used in case of a job recovery - * @param timeout Timeout for futures - * @param leaderElectionService LeaderElectionService to participate in the leader election - */ -abstract class ContaineredJobManager( - flinkConfiguration: FlinkConfiguration, - executorService: ExecutorService, - instanceManager: InstanceManager, - scheduler: FlinkScheduler, - libraryCacheManager: BlobLibraryCacheManager, - archive: ActorRef, - restartStrategyFactory: RestartStrategyFactory, - timeout: FiniteDuration, - leaderElectionService: LeaderElectionService, - submittedJobGraphs : SubmittedJobGraphStore, - checkpointRecoveryFactory : CheckpointRecoveryFactory, - savepointStore: SavepointStore, - jobRecoveryTimeout: FiniteDuration, - metricsRegistry: Option[FlinkMetricRegistry]) - extends JobManager( - flinkConfiguration, - executorService, - instanceManager, - scheduler, - libraryCacheManager, - archive, - restartStrategyFactory, - timeout, - leaderElectionService, - submittedJobGraphs, - checkpointRecoveryFactory, - savepointStore, - jobRecoveryTimeout, - metricsRegistry) { - - val jobPollingInterval: FiniteDuration - - // indicates if this JM has been started in a dedicated (per-job) mode. - var stopWhenJobFinished: JobID = null - - override def handleMessage: Receive = { - handleContainerMessage orElse super.handleMessage - } - - def handleContainerMessage: Receive = { - - case msg @ (_: RegisterInfoMessageListener | _: UnRegisterInfoMessageListener) => - // forward to ResourceManager - currentResourceManager match { - case Some(rm) => - // we forward the message - rm.forward(decorateMessage(msg)) - case None => - // client has to try again - } - - case msg: ShutdownClusterAfterJob => - val jobId = msg.jobId() - log.info(s"ApplicationMaster will shut down session when job $jobId has finished.") - stopWhenJobFinished = jobId - // trigger regular job status messages (if this is a dedicated/per-job cluster) - if (stopWhenJobFinished != null) { - context.system.scheduler.schedule(0 seconds, - jobPollingInterval, - new Runnable { - override def run(): Unit = { - self ! decorateMessage(RequestJobStatus(stopWhenJobFinished)) - } - } - )(context.dispatcher) - } - - sender() ! decorateMessage(Acknowledge) - - case msg: GetClusterStatus => - sender() ! decorateMessage( - new GetClusterStatusResponse( - instanceManager.getNumberOfRegisteredTaskManagers, - instanceManager.getTotalNumberOfSlots) - ) - - case jnf: JobNotFound => - log.debug(s"Job with ID ${jnf.jobID} not found in JobManager") - if (stopWhenJobFinished == null) { - log.warn("The ApplicationMaster didn't expect to receive this message") - } - - case jobStatus: CurrentJobStatus => - if (stopWhenJobFinished == null) { - log.warn(s"Received job status $jobStatus which wasn't requested.") - } else { - if (stopWhenJobFinished != jobStatus.jobID) { - log.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " + - s"job $stopWhenJobFinished") - } else { - if (jobStatus.status.isGloballyTerminalState) { - log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " + - s"Shutting down session") - if (jobStatus.status == JobStatus.FINISHED) { - self ! decorateMessage( - new StopCluster( - ApplicationStatus.SUCCEEDED, - s"The monitored job with ID ${jobStatus.jobID} has finished.") - ) - } else { - self ! decorateMessage( - new StopCluster( - ApplicationStatus.FAILED, - s"The monitored job with ID ${jobStatus.jobID} has failed to complete.") - ) - } - } else { - log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}") - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/842e3e7d/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala new file mode 100644 index 0000000..5f965d2 --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework + +import java.util.concurrent.ExecutorService + +import akka.actor.ActorRef +import org.apache.flink.api.common.JobID +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore +import org.apache.flink.runtime.clusterframework.messages._ +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory +import org.apache.flink.runtime.instance.InstanceManager +import org.apache.flink.runtime.jobgraph.JobStatus +import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} +import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore} +import org.apache.flink.runtime.leaderelection.LeaderElectionService +import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, JobNotFound, RequestJobStatus} +import org.apache.flink.runtime.messages.Messages.Acknowledge +import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} + +import scala.concurrent.duration._ +import scala.language.postfixOps + + +/** JobManager actor for execution on Yarn or Mesos. It enriches the [[JobManager]] with additional messages + * to start/administer/stop the session. + * + * @param flinkConfiguration Configuration object for the actor + * @param executorService Execution context which is used to execute concurrent tasks in the + * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] + * @param instanceManager Instance manager to manage the registered + * [[org.apache.flink.runtime.taskmanager.TaskManager]] + * @param scheduler Scheduler to schedule Flink jobs + * @param libraryCacheManager Manager to manage uploaded jar files + * @param archive Archive for finished Flink jobs + * @param restartStrategyFactory Restart strategy to be used in case of a job recovery + * @param timeout Timeout for futures + * @param leaderElectionService LeaderElectionService to participate in the leader election + */ +abstract class ContaineredJobManager( + flinkConfiguration: Configuration, + executorService: ExecutorService, + instanceManager: InstanceManager, + scheduler: FlinkScheduler, + libraryCacheManager: BlobLibraryCacheManager, + archive: ActorRef, + restartStrategyFactory: RestartStrategyFactory, + timeout: FiniteDuration, + leaderElectionService: LeaderElectionService, + submittedJobGraphs : SubmittedJobGraphStore, + checkpointRecoveryFactory : CheckpointRecoveryFactory, + savepointStore: SavepointStore, + jobRecoveryTimeout: FiniteDuration, + metricsRegistry: Option[FlinkMetricRegistry]) + extends JobManager( + flinkConfiguration, + executorService, + instanceManager, + scheduler, + libraryCacheManager, + archive, + restartStrategyFactory, + timeout, + leaderElectionService, + submittedJobGraphs, + checkpointRecoveryFactory, + savepointStore, + jobRecoveryTimeout, + metricsRegistry) { + + val jobPollingInterval: FiniteDuration + + // indicates if this JM has been started in a dedicated (per-job) mode. + var stopWhenJobFinished: JobID = null + + override def handleMessage: Receive = { + handleContainerMessage orElse super.handleMessage + } + + def handleContainerMessage: Receive = { + + case msg @ (_: RegisterInfoMessageListener | _: UnRegisterInfoMessageListener) => + // forward to ResourceManager + currentResourceManager match { + case Some(rm) => + // we forward the message + rm.forward(decorateMessage(msg)) + case None => + // client has to try again + } + + case msg: ShutdownClusterAfterJob => + val jobId = msg.jobId() + log.info(s"ApplicationMaster will shut down session when job $jobId has finished.") + stopWhenJobFinished = jobId + // trigger regular job status messages (if this is a dedicated/per-job cluster) + if (stopWhenJobFinished != null) { + context.system.scheduler.schedule(0 seconds, + jobPollingInterval, + new Runnable { + override def run(): Unit = { + self ! decorateMessage(RequestJobStatus(stopWhenJobFinished)) + } + } + )(context.dispatcher) + } + + sender() ! decorateMessage(Acknowledge) + + case msg: GetClusterStatus => + sender() ! decorateMessage( + new GetClusterStatusResponse( + instanceManager.getNumberOfRegisteredTaskManagers, + instanceManager.getTotalNumberOfSlots) + ) + + case jnf: JobNotFound => + log.debug(s"Job with ID ${jnf.jobID} not found in JobManager") + if (stopWhenJobFinished == null) { + log.warn("The ApplicationMaster didn't expect to receive this message") + } + + case jobStatus: CurrentJobStatus => + if (stopWhenJobFinished == null) { + log.warn(s"Received job status $jobStatus which wasn't requested.") + } else { + if (stopWhenJobFinished != jobStatus.jobID) { + log.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " + + s"job $stopWhenJobFinished") + } else { + if (jobStatus.status.isGloballyTerminalState) { + log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " + + s"Shutting down session") + if (jobStatus.status == JobStatus.FINISHED) { + self ! decorateMessage( + new StopCluster( + ApplicationStatus.SUCCEEDED, + s"The monitored job with ID ${jobStatus.jobID} has finished.") + ) + } else { + self ! decorateMessage( + new StopCluster( + ApplicationStatus.FAILED, + s"The monitored job with ID ${jobStatus.jobID} has failed to complete.") + ) + } + } else { + log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}") + } + } + } + } +}
