[FLINK-1984] port Mesos code to latest master

- move Scala code to /scala dir
- remove merge commits
- update version

This closes #2315


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/842e3e7d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/842e3e7d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/842e3e7d

Branch: refs/heads/master
Commit: 842e3e7d13c821fccd599642a417b6328915a366
Parents: 38a9534
Author: Maximilian Michels <[email protected]>
Authored: Mon Aug 29 17:08:01 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Mon Aug 29 17:32:51 2016 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |   7 +-
 flink-mesos/pom.xml                             |   2 +-
 .../MesosApplicationMasterRunner.java           |   8 +-
 .../store/ZooKeeperMesosWorkerStore.java        |   2 +-
 .../ContaineredJobManager.scala                 | 172 -------------------
 .../ContaineredJobManager.scala                 | 172 +++++++++++++++++++
 6 files changed, 182 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/842e3e7d/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 2fe27e0..f0f1b6b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -742,6 +742,10 @@ public final class ConfigConstants {
        @PublicEvolving
        public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = 
"high-availability.zookeeper.path.checkpoint-counter";
 
+       /** ZooKeeper root path (ZNode) for Mesos workers. */
+       @PublicEvolving
+       public static final String HA_ZOOKEEPER_MESOS_WORKERS_PATH = 
"recovery.zookeeper.path.mesos-workers";
+
        @PublicEvolving
        public static final String HA_ZOOKEEPER_SESSION_TIMEOUT = 
"high-availability.zookeeper.client.session-timeout";
 
@@ -790,9 +794,6 @@ public final class ConfigConstants {
        @Deprecated
        public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = 
"recovery.zookeeper.path.checkpoint-counter";
 
-       /** ZooKeeper root path (ZNode) for Mesos workers. */
-       public static final String ZOOKEEPER_MESOS_WORKERS_PATH = 
"recovery.zookeeper.path.mesos-workers";
-
        /** Deprecated in favour of {@link #HA_ZOOKEEPER_SESSION_TIMEOUT}. */
        @Deprecated
        public static final String ZOOKEEPER_SESSION_TIMEOUT = 
"recovery.zookeeper.client.session-timeout";

http://git-wip-us.apache.org/repos/asf/flink/blob/842e3e7d/flink-mesos/pom.xml
----------------------------------------------------------------------
diff --git a/flink-mesos/pom.xml b/flink-mesos/pom.xml
index 43b3195..a6edc0b 100644
--- a/flink-mesos/pom.xml
+++ b/flink-mesos/pom.xml
@@ -23,7 +23,7 @@ under the License.
        <parent>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-parent</artifactId>
-               <version>1.1-SNAPSHOT</version>
+               <version>1.2-SNAPSHOT</version>
                <relativePath>..</relativePath>
        </parent>
        

http://git-wip-us.apache.org/repos/asf/flink/blob/842e3e7d/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index c13cdf9..9916a87 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -38,9 +38,9 @@ import org.apache.flink.mesos.util.ZooKeeperUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -481,11 +481,11 @@ public class MesosApplicationMasterRunner {
 
        private static MesosWorkerStore createWorkerStore(Configuration 
flinkConfig) throws Exception {
                MesosWorkerStore workerStore;
-               RecoveryMode recoveryMode = 
RecoveryMode.fromConfig(flinkConfig);
-               if (recoveryMode == RecoveryMode.STANDALONE) {
+               HighAvailabilityMode recoveryMode = 
HighAvailabilityMode.fromConfig(flinkConfig);
+               if (recoveryMode == HighAvailabilityMode.NONE) {
                        workerStore = new StandaloneMesosWorkerStore();
                }
-               else if (recoveryMode == RecoveryMode.ZOOKEEPER) {
+               else if (recoveryMode == HighAvailabilityMode.ZOOKEEPER) {
                        // note: the store is responsible for closing the 
client.
                        CuratorFramework client = 
ZooKeeperUtils.startCuratorFramework(flinkConfig);
                        workerStore = 
ZooKeeperMesosWorkerStore.createMesosWorkerStore(client, flinkConfig);

http://git-wip-us.apache.org/repos/asf/flink/blob/842e3e7d/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index 45553d4..c5cef8e 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -292,7 +292,7 @@ public class ZooKeeperMesosWorkerStore implements 
MesosWorkerStore {
                        
ZooKeeperUtils.createFileSystemStateStorage(configuration, "mesosWorkerStore");
 
                String zooKeeperMesosWorkerStorePath = configuration.getString(
-                       ConfigConstants.ZOOKEEPER_MESOS_WORKERS_PATH,
+                       ConfigConstants.HA_ZOOKEEPER_MESOS_WORKERS_PATH,
                        ConfigConstants.DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH
                );
 

http://git-wip-us.apache.org/repos/asf/flink/blob/842e3e7d/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
deleted file mode 100644
index 45b404a..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework
-
-import java.util.concurrent.ExecutorService
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
-import org.apache.flink.runtime.clusterframework.messages._
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
-import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
-import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, 
JobNotFound, RequestJobStatus}
-import org.apache.flink.runtime.messages.Messages.Acknowledge
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-
-/** JobManager actor for execution on Yarn or Mesos. It enriches the 
[[JobManager]] with additional messages
-  * to start/administer/stop the session.
-  *
-  * @param flinkConfiguration Configuration object for the actor
-  * @param executorService Execution context which is used to execute 
concurrent tasks in the
-  *                         
[[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
-  * @param instanceManager Instance manager to manage the registered
-  *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
-  * @param scheduler Scheduler to schedule Flink jobs
-  * @param libraryCacheManager Manager to manage uploaded jar files
-  * @param archive Archive for finished Flink jobs
-  * @param restartStrategyFactory Restart strategy to be used in case of a job 
recovery
-  * @param timeout Timeout for futures
-  * @param leaderElectionService LeaderElectionService to participate in the 
leader election
-  */
-abstract class ContaineredJobManager(
-    flinkConfiguration: FlinkConfiguration,
-    executorService: ExecutorService,
-    instanceManager: InstanceManager,
-    scheduler: FlinkScheduler,
-    libraryCacheManager: BlobLibraryCacheManager,
-    archive: ActorRef,
-    restartStrategyFactory: RestartStrategyFactory,
-    timeout: FiniteDuration,
-    leaderElectionService: LeaderElectionService,
-    submittedJobGraphs : SubmittedJobGraphStore,
-    checkpointRecoveryFactory : CheckpointRecoveryFactory,
-    savepointStore: SavepointStore,
-    jobRecoveryTimeout: FiniteDuration,
-    metricsRegistry: Option[FlinkMetricRegistry])
-  extends JobManager(
-    flinkConfiguration,
-    executorService,
-    instanceManager,
-    scheduler,
-    libraryCacheManager,
-    archive,
-    restartStrategyFactory,
-    timeout,
-    leaderElectionService,
-    submittedJobGraphs,
-    checkpointRecoveryFactory,
-    savepointStore,
-    jobRecoveryTimeout,
-    metricsRegistry) {
-
-  val jobPollingInterval: FiniteDuration
-
-  // indicates if this JM has been started in a dedicated (per-job) mode.
-  var stopWhenJobFinished: JobID = null
-
-  override def handleMessage: Receive = {
-    handleContainerMessage orElse super.handleMessage
-  }
-
-  def handleContainerMessage: Receive = {
-
-    case msg @ (_: RegisterInfoMessageListener | _: 
UnRegisterInfoMessageListener) =>
-      // forward to ResourceManager
-      currentResourceManager match {
-        case Some(rm) =>
-          // we forward the message
-          rm.forward(decorateMessage(msg))
-        case None =>
-          // client has to try again
-      }
-
-    case msg: ShutdownClusterAfterJob =>
-      val jobId = msg.jobId()
-      log.info(s"ApplicationMaster will shut down session when job $jobId has 
finished.")
-      stopWhenJobFinished = jobId
-      // trigger regular job status messages (if this is a dedicated/per-job 
cluster)
-      if (stopWhenJobFinished != null) {
-        context.system.scheduler.schedule(0 seconds,
-          jobPollingInterval,
-          new Runnable {
-            override def run(): Unit = {
-              self ! decorateMessage(RequestJobStatus(stopWhenJobFinished))
-            }
-          }
-        )(context.dispatcher)
-      }
-
-      sender() ! decorateMessage(Acknowledge)
-
-    case msg: GetClusterStatus =>
-      sender() ! decorateMessage(
-        new GetClusterStatusResponse(
-          instanceManager.getNumberOfRegisteredTaskManagers,
-          instanceManager.getTotalNumberOfSlots)
-      )
-
-    case jnf: JobNotFound =>
-      log.debug(s"Job with ID ${jnf.jobID} not found in JobManager")
-      if (stopWhenJobFinished == null) {
-        log.warn("The ApplicationMaster didn't expect to receive this message")
-      }
-
-    case jobStatus: CurrentJobStatus =>
-      if (stopWhenJobFinished == null) {
-        log.warn(s"Received job status $jobStatus which wasn't requested.")
-      } else {
-        if (stopWhenJobFinished != jobStatus.jobID) {
-          log.warn(s"Received job status for job ${jobStatus.jobID} but 
expected status for " +
-            s"job $stopWhenJobFinished")
-        } else {
-          if (jobStatus.status.isGloballyTerminalState) {
-            log.info(s"Job with ID ${jobStatus.jobID} is in terminal state 
${jobStatus.status}. " +
-              s"Shutting down session")
-            if (jobStatus.status == JobStatus.FINISHED) {
-              self ! decorateMessage(
-                new StopCluster(
-                  ApplicationStatus.SUCCEEDED,
-                  s"The monitored job with ID ${jobStatus.jobID} has 
finished.")
-              )
-            } else {
-              self ! decorateMessage(
-                new StopCluster(
-                  ApplicationStatus.FAILED,
-                  s"The monitored job with ID ${jobStatus.jobID} has failed to 
complete.")
-              )
-            }
-          } else {
-            log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state 
${jobStatus.status}")
-          }
-        }
-      }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/842e3e7d/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
new file mode 100644
index 0000000..5f965d2
--- /dev/null
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework
+
+import java.util.concurrent.ExecutorService
+
+import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
+import org.apache.flink.runtime.clusterframework.messages._
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
+import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, 
JobNotFound, RequestJobStatus}
+import org.apache.flink.runtime.messages.Messages.Acknowledge
+import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+
+/** JobManager actor for execution on Yarn or Mesos. It enriches the 
[[JobManager]] with additional messages
+  * to start/administer/stop the session.
+  *
+  * @param flinkConfiguration Configuration object for the actor
+  * @param executorService Execution context which is used to execute 
concurrent tasks in the
+  *                         
[[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param instanceManager Instance manager to manage the registered
+  *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
+  * @param scheduler Scheduler to schedule Flink jobs
+  * @param libraryCacheManager Manager to manage uploaded jar files
+  * @param archive Archive for finished Flink jobs
+  * @param restartStrategyFactory Restart strategy to be used in case of a job 
recovery
+  * @param timeout Timeout for futures
+  * @param leaderElectionService LeaderElectionService to participate in the 
leader election
+  */
+abstract class ContaineredJobManager(
+    flinkConfiguration: Configuration,
+    executorService: ExecutorService,
+    instanceManager: InstanceManager,
+    scheduler: FlinkScheduler,
+    libraryCacheManager: BlobLibraryCacheManager,
+    archive: ActorRef,
+    restartStrategyFactory: RestartStrategyFactory,
+    timeout: FiniteDuration,
+    leaderElectionService: LeaderElectionService,
+    submittedJobGraphs : SubmittedJobGraphStore,
+    checkpointRecoveryFactory : CheckpointRecoveryFactory,
+    savepointStore: SavepointStore,
+    jobRecoveryTimeout: FiniteDuration,
+    metricsRegistry: Option[FlinkMetricRegistry])
+  extends JobManager(
+    flinkConfiguration,
+    executorService,
+    instanceManager,
+    scheduler,
+    libraryCacheManager,
+    archive,
+    restartStrategyFactory,
+    timeout,
+    leaderElectionService,
+    submittedJobGraphs,
+    checkpointRecoveryFactory,
+    savepointStore,
+    jobRecoveryTimeout,
+    metricsRegistry) {
+
+  val jobPollingInterval: FiniteDuration
+
+  // indicates if this JM has been started in a dedicated (per-job) mode.
+  var stopWhenJobFinished: JobID = null
+
+  override def handleMessage: Receive = {
+    handleContainerMessage orElse super.handleMessage
+  }
+
+  def handleContainerMessage: Receive = {
+
+    case msg @ (_: RegisterInfoMessageListener | _: 
UnRegisterInfoMessageListener) =>
+      // forward to ResourceManager
+      currentResourceManager match {
+        case Some(rm) =>
+          // we forward the message
+          rm.forward(decorateMessage(msg))
+        case None =>
+          // client has to try again
+      }
+
+    case msg: ShutdownClusterAfterJob =>
+      val jobId = msg.jobId()
+      log.info(s"ApplicationMaster will shut down session when job $jobId has 
finished.")
+      stopWhenJobFinished = jobId
+      // trigger regular job status messages (if this is a dedicated/per-job 
cluster)
+      if (stopWhenJobFinished != null) {
+        context.system.scheduler.schedule(0 seconds,
+          jobPollingInterval,
+          new Runnable {
+            override def run(): Unit = {
+              self ! decorateMessage(RequestJobStatus(stopWhenJobFinished))
+            }
+          }
+        )(context.dispatcher)
+      }
+
+      sender() ! decorateMessage(Acknowledge)
+
+    case msg: GetClusterStatus =>
+      sender() ! decorateMessage(
+        new GetClusterStatusResponse(
+          instanceManager.getNumberOfRegisteredTaskManagers,
+          instanceManager.getTotalNumberOfSlots)
+      )
+
+    case jnf: JobNotFound =>
+      log.debug(s"Job with ID ${jnf.jobID} not found in JobManager")
+      if (stopWhenJobFinished == null) {
+        log.warn("The ApplicationMaster didn't expect to receive this message")
+      }
+
+    case jobStatus: CurrentJobStatus =>
+      if (stopWhenJobFinished == null) {
+        log.warn(s"Received job status $jobStatus which wasn't requested.")
+      } else {
+        if (stopWhenJobFinished != jobStatus.jobID) {
+          log.warn(s"Received job status for job ${jobStatus.jobID} but 
expected status for " +
+            s"job $stopWhenJobFinished")
+        } else {
+          if (jobStatus.status.isGloballyTerminalState) {
+            log.info(s"Job with ID ${jobStatus.jobID} is in terminal state 
${jobStatus.status}. " +
+              s"Shutting down session")
+            if (jobStatus.status == JobStatus.FINISHED) {
+              self ! decorateMessage(
+                new StopCluster(
+                  ApplicationStatus.SUCCEEDED,
+                  s"The monitored job with ID ${jobStatus.jobID} has 
finished.")
+              )
+            } else {
+              self ! decorateMessage(
+                new StopCluster(
+                  ApplicationStatus.FAILED,
+                  s"The monitored job with ID ${jobStatus.jobID} has failed to 
complete.")
+              )
+            }
+          } else {
+            log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state 
${jobStatus.status}")
+          }
+        }
+      }
+  }
+}

Reply via email to