[
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15426378#comment-15426378
]
ASF GitHub Bot commented on FLINK-1984:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2315#discussion_r75301829
--- Diff:
flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework
+
+import java.util.concurrent.{TimeUnit, ExecutorService}
+
+import akka.actor.ActorRef
+
+import org.apache.flink.api.common.JobID
+import org.apache.flink.configuration.{Configuration =>
FlinkConfiguration, ConfigConstants}
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.clusterframework.ApplicationStatus
+import
org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.clusterframework.messages._
+import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore,
JobManager}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import
org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus,
CurrentJobStatus, JobNotFound}
+import org.apache.flink.runtime.messages.Messages.Acknowledge
+import org.apache.flink.runtime.metrics.{MetricRegistry =>
FlinkMetricRegistry}
+import
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler =>
FlinkScheduler}
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+
+/** JobManager actor for execution on Yarn or Mesos. It enriches the
[[JobManager]] with additional messages
+ * to start/administer/stop the session.
+ *
+ * @param flinkConfiguration Configuration object for the actor
+ * @param executorService Execution context which is used to execute
concurrent tasks in the
+ *
[[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+ * @param instanceManager Instance manager to manage the registered
+ *
[[org.apache.flink.runtime.taskmanager.TaskManager]]
+ * @param scheduler Scheduler to schedule Flink jobs
+ * @param libraryCacheManager Manager to manage uploaded jar files
+ * @param archive Archive for finished Flink jobs
+ * @param restartStrategyFactory Restart strategy to be used in case of a
job recovery
+ * @param timeout Timeout for futures
+ * @param leaderElectionService LeaderElectionService to participate in
the leader election
+ */
+abstract class ContaineredJobManager(
+ flinkConfiguration: FlinkConfiguration,
+ executorService: ExecutorService,
+ instanceManager: InstanceManager,
+ scheduler: FlinkScheduler,
+ libraryCacheManager: BlobLibraryCacheManager,
+ archive: ActorRef,
+ restartStrategyFactory: RestartStrategyFactory,
+ timeout: FiniteDuration,
+ leaderElectionService: LeaderElectionService,
+ submittedJobGraphs : SubmittedJobGraphStore,
+ checkpointRecoveryFactory :
CheckpointRecoveryFactory,
+ savepointStore: SavepointStore,
+ jobRecoveryTimeout: FiniteDuration,
+ metricsRegistry: Option[FlinkMetricRegistry])
+ extends JobManager(
+ flinkConfiguration,
+ executorService,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archive,
+ restartStrategyFactory,
+ timeout,
+ leaderElectionService,
+ submittedJobGraphs,
+ checkpointRecoveryFactory,
+ savepointStore,
+ jobRecoveryTimeout,
+ metricsRegistry) {
+
+ val jobPollingInterval: FiniteDuration
+
+ // indicates if this JM has been started in a dedicated (per-job) mode.
+ var stopWhenJobFinished: JobID = null
+
+ override def handleMessage: Receive = {
+ handleContainerMessage orElse super.handleMessage
+ }
+
+ def handleContainerMessage: Receive = {
+
+ case msg @ (_: RegisterInfoMessageListener | _:
UnRegisterInfoMessageListener) =>
+ // forward to ResourceManager
+ currentResourceManager match {
+ case Some(rm) =>
+ // we forward the message
+ rm.forward(decorateMessage(msg))
+ case None =>
+ // client has to try again
+ }
+
+ case msg: ShutdownClusterAfterJob =>
+ val jobId = msg.jobId()
+ log.info(s"ApplicationMaster will shut down session when job $jobId
has finished.")
+ stopWhenJobFinished = jobId
+ // trigger regular job status messages (if this is a
dedicated/per-job cluster)
+ if (stopWhenJobFinished != null) {
+ context.system.scheduler.schedule(0 seconds,
--- End diff --
The polling is a left-over of the old Yarn code. Indeed, would be nicer to
apply a hook immediately upon job removal.
+1 for making `ContaineredJobManager` the base for the Yarn and Mesos
JobManager.
> Integrate Flink with Apache Mesos
> ---------------------------------
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
> Issue Type: New Feature
> Components: Cluster Management
> Reporter: Robert Metzger
> Assignee: Eron Wright
> Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-:
> https://github.com/apache/flink/pull/251
> Update (May '16): a new effort is now underway, building on the recent
> ResourceManager work.
> Design document: ([google
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)