[SPARK-18662] Move resource managers to separate directory ## What changes were proposed in this pull request?
* Moves yarn and mesos scheduler backends to resource-managers/ sub-directory (in preparation for https://issues.apache.org/jira/browse/SPARK-18278) * Corresponding change in top-level pom.xml. Ref: https://github.com/apache/spark/pull/16061#issuecomment-263649340 ## How was this patch tested? * Manual tests /cc rxin Author: Anirudh <[email protected]> Closes #16092 from foxish/fix-scheduler-structure-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81e5619c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81e5619c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81e5619c Branch: refs/heads/master Commit: 81e5619ca141a1d3a06547d2b682cbe3f135b360 Parents: a8ced76 Author: Anirudh <[email protected]> Authored: Tue Dec 6 16:23:27 2016 -0800 Committer: Marcelo Vanzin <[email protected]> Committed: Tue Dec 6 16:23:27 2016 -0800 ---------------------------------------------------------------------- dev/sparktestsupport/modules.py | 4 +- mesos/pom.xml | 109 -- ...pache.spark.scheduler.ExternalClusterManager | 1 - .../deploy/mesos/MesosClusterDispatcher.scala | 119 -- .../mesos/MesosClusterDispatcherArguments.scala | 149 -- .../deploy/mesos/MesosDriverDescription.scala | 70 - .../mesos/MesosExternalShuffleService.scala | 131 -- .../org/apache/spark/deploy/mesos/config.scala | 59 - .../spark/deploy/mesos/ui/DriverPage.scala | 179 -- .../deploy/mesos/ui/MesosClusterPage.scala | 136 -- .../spark/deploy/mesos/ui/MesosClusterUI.scala | 49 - .../deploy/rest/mesos/MesosRestServer.scala | 156 -- .../spark/executor/MesosExecutorBackend.scala | 131 -- .../cluster/mesos/MesosClusterManager.scala | 64 - .../mesos/MesosClusterPersistenceEngine.scala | 134 -- .../cluster/mesos/MesosClusterScheduler.scala | 740 --------- .../mesos/MesosClusterSchedulerSource.scala | 40 - .../MesosCoarseGrainedSchedulerBackend.scala | 668 -------- .../MesosFineGrainedSchedulerBackend.scala | 444 ----- .../mesos/MesosSchedulerBackendUtil.scala | 165 -- .../cluster/mesos/MesosSchedulerUtils.scala | 524 ------ .../cluster/mesos/MesosTaskLaunchData.scala | 51 - .../MesosClusterDispatcherArgumentsSuite.scala | 63 - .../mesos/MesosClusterDispatcherSuite.scala | 40 - .../mesos/MesosClusterManagerSuite.scala | 56 - .../mesos/MesosClusterSchedulerSuite.scala | 239 --- ...esosCoarseGrainedSchedulerBackendSuite.scala | 601 ------- .../MesosFineGrainedSchedulerBackendSuite.scala | 385 ----- .../mesos/MesosSchedulerUtilsSuite.scala | 256 --- .../mesos/MesosTaskLaunchDataSuite.scala | 36 - .../spark/scheduler/cluster/mesos/Utils.scala | 91 -- pom.xml | 4 +- resource-managers/mesos/pom.xml | 109 ++ ...pache.spark.scheduler.ExternalClusterManager | 1 + .../deploy/mesos/MesosClusterDispatcher.scala | 119 ++ .../mesos/MesosClusterDispatcherArguments.scala | 149 ++ .../deploy/mesos/MesosDriverDescription.scala | 70 + .../mesos/MesosExternalShuffleService.scala | 131 ++ .../org/apache/spark/deploy/mesos/config.scala | 59 + .../spark/deploy/mesos/ui/DriverPage.scala | 179 ++ .../deploy/mesos/ui/MesosClusterPage.scala | 136 ++ .../spark/deploy/mesos/ui/MesosClusterUI.scala | 49 + .../deploy/rest/mesos/MesosRestServer.scala | 156 ++ .../spark/executor/MesosExecutorBackend.scala | 131 ++ .../cluster/mesos/MesosClusterManager.scala | 64 + .../mesos/MesosClusterPersistenceEngine.scala | 134 ++ .../cluster/mesos/MesosClusterScheduler.scala | 740 +++++++++ .../mesos/MesosClusterSchedulerSource.scala | 40 + .../MesosCoarseGrainedSchedulerBackend.scala | 668 ++++++++ .../MesosFineGrainedSchedulerBackend.scala | 444 +++++ .../mesos/MesosSchedulerBackendUtil.scala | 165 ++ .../cluster/mesos/MesosSchedulerUtils.scala | 524 ++++++ .../cluster/mesos/MesosTaskLaunchData.scala | 51 + .../MesosClusterDispatcherArgumentsSuite.scala | 63 + .../mesos/MesosClusterDispatcherSuite.scala | 40 + .../mesos/MesosClusterManagerSuite.scala | 56 + .../mesos/MesosClusterSchedulerSuite.scala | 239 +++ ...esosCoarseGrainedSchedulerBackendSuite.scala | 601 +++++++ .../MesosFineGrainedSchedulerBackendSuite.scala | 385 +++++ .../mesos/MesosSchedulerUtilsSuite.scala | 256 +++ .../mesos/MesosTaskLaunchDataSuite.scala | 36 + .../spark/scheduler/cluster/mesos/Utils.scala | 91 ++ resource-managers/yarn/pom.xml | 215 +++ ...ploy.yarn.security.ServiceCredentialProvider | 3 + ...pache.spark.scheduler.ExternalClusterManager | 1 + .../spark/deploy/yarn/ApplicationMaster.scala | 791 +++++++++ .../yarn/ApplicationMasterArguments.scala | 105 ++ .../org/apache/spark/deploy/yarn/Client.scala | 1541 ++++++++++++++++++ .../spark/deploy/yarn/ClientArguments.scala | 86 + .../yarn/ClientDistributedCacheManager.scala | 186 +++ .../spark/deploy/yarn/ExecutorRunnable.scala | 266 +++ ...ityPreferredContainerPlacementStrategy.scala | 224 +++ .../spark/deploy/yarn/YarnAllocator.scala | 727 +++++++++ .../apache/spark/deploy/yarn/YarnRMClient.scala | 135 ++ .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 317 ++++ .../org/apache/spark/deploy/yarn/config.scala | 347 ++++ .../yarn/security/AMCredentialRenewer.scala | 235 +++ .../ConfigurableCredentialManager.scala | 105 ++ .../yarn/security/CredentialUpdater.scala | 130 ++ .../yarn/security/HBaseCredentialProvider.scala | 74 + .../yarn/security/HDFSCredentialProvider.scala | 110 ++ .../yarn/security/HiveCredentialProvider.scala | 129 ++ .../security/ServiceCredentialProvider.scala | 57 + .../launcher/YarnCommandBuilderUtils.scala | 53 + .../cluster/SchedulerExtensionService.scala | 143 ++ .../cluster/YarnClientSchedulerBackend.scala | 157 ++ .../scheduler/cluster/YarnClusterManager.scala | 56 + .../cluster/YarnClusterScheduler.scala | 37 + .../cluster/YarnClusterSchedulerBackend.scala | 67 + .../spark/scheduler/cluster/YarnScheduler.scala | 39 + .../cluster/YarnSchedulerBackend.scala | 315 ++++ ...ploy.yarn.security.ServiceCredentialProvider | 1 + .../yarn/src/test/resources/log4j.properties | 31 + .../deploy/yarn/BaseYarnClusterSuite.scala | 241 +++ .../ClientDistributedCacheManagerSuite.scala | 204 +++ .../apache/spark/deploy/yarn/ClientSuite.scala | 462 ++++++ .../yarn/ContainerPlacementStrategySuite.scala | 153 ++ .../spark/deploy/yarn/YarnAllocatorSuite.scala | 344 ++++ .../spark/deploy/yarn/YarnClusterSuite.scala | 493 ++++++ .../yarn/YarnShuffleIntegrationSuite.scala | 112 ++ .../deploy/yarn/YarnSparkHadoopUtilSuite.scala | 213 +++ .../ConfigurableCredentialManagerSuite.scala | 150 ++ .../security/HDFSCredentialProviderSuite.scala | 71 + .../spark/launcher/TestClasspathBuilder.scala | 36 + .../network/shuffle/ShuffleTestAccessor.scala | 70 + .../network/yarn/YarnShuffleServiceSuite.scala | 372 +++++ .../spark/network/yarn/YarnTestAccessor.scala | 37 + .../ExtensionServiceIntegrationSuite.scala | 72 + .../cluster/SimpleExtensionService.scala | 34 + .../cluster/StubApplicationAttemptId.scala | 48 + .../scheduler/cluster/StubApplicationId.scala | 42 + yarn/pom.xml | 215 --- ...ploy.yarn.security.ServiceCredentialProvider | 3 - ...pache.spark.scheduler.ExternalClusterManager | 1 - .../spark/deploy/yarn/ApplicationMaster.scala | 791 --------- .../yarn/ApplicationMasterArguments.scala | 105 -- .../org/apache/spark/deploy/yarn/Client.scala | 1541 ------------------ .../spark/deploy/yarn/ClientArguments.scala | 86 - .../yarn/ClientDistributedCacheManager.scala | 186 --- .../spark/deploy/yarn/ExecutorRunnable.scala | 266 --- ...ityPreferredContainerPlacementStrategy.scala | 224 --- .../spark/deploy/yarn/YarnAllocator.scala | 727 --------- .../apache/spark/deploy/yarn/YarnRMClient.scala | 135 -- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 317 ---- .../org/apache/spark/deploy/yarn/config.scala | 347 ---- .../yarn/security/AMCredentialRenewer.scala | 235 --- .../ConfigurableCredentialManager.scala | 105 -- .../yarn/security/CredentialUpdater.scala | 130 -- .../yarn/security/HBaseCredentialProvider.scala | 74 - .../yarn/security/HDFSCredentialProvider.scala | 110 -- .../yarn/security/HiveCredentialProvider.scala | 129 -- .../security/ServiceCredentialProvider.scala | 57 - .../launcher/YarnCommandBuilderUtils.scala | 53 - .../cluster/SchedulerExtensionService.scala | 143 -- .../cluster/YarnClientSchedulerBackend.scala | 157 -- .../scheduler/cluster/YarnClusterManager.scala | 56 - .../cluster/YarnClusterScheduler.scala | 37 - .../cluster/YarnClusterSchedulerBackend.scala | 67 - .../spark/scheduler/cluster/YarnScheduler.scala | 39 - .../cluster/YarnSchedulerBackend.scala | 315 ---- ...ploy.yarn.security.ServiceCredentialProvider | 1 - yarn/src/test/resources/log4j.properties | 31 - .../deploy/yarn/BaseYarnClusterSuite.scala | 241 --- .../ClientDistributedCacheManagerSuite.scala | 204 --- .../apache/spark/deploy/yarn/ClientSuite.scala | 462 ------ .../yarn/ContainerPlacementStrategySuite.scala | 153 -- .../spark/deploy/yarn/YarnAllocatorSuite.scala | 344 ---- .../spark/deploy/yarn/YarnClusterSuite.scala | 493 ------ .../yarn/YarnShuffleIntegrationSuite.scala | 112 -- .../deploy/yarn/YarnSparkHadoopUtilSuite.scala | 213 --- .../ConfigurableCredentialManagerSuite.scala | 150 -- .../security/HDFSCredentialProviderSuite.scala | 71 - .../spark/launcher/TestClasspathBuilder.scala | 36 - .../network/shuffle/ShuffleTestAccessor.scala | 70 - .../network/yarn/YarnShuffleServiceSuite.scala | 372 ----- .../spark/network/yarn/YarnTestAccessor.scala | 37 - .../ExtensionServiceIntegrationSuite.scala | 72 - .../cluster/SimpleExtensionService.scala | 34 - .../cluster/StubApplicationAttemptId.scala | 48 - .../scheduler/cluster/StubApplicationId.scala | 42 - 160 files changed, 15727 insertions(+), 15727 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/dev/sparktestsupport/modules.py ---------------------------------------------------------------------- diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index b34ab51..1a7cf9a 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -469,7 +469,7 @@ yarn = Module( name="yarn", dependencies=[], source_file_regexes=[ - "yarn/", + "resource-managers/yarn/", "common/network-yarn/", ], build_profile_flags=["-Pyarn"], @@ -485,7 +485,7 @@ yarn = Module( mesos = Module( name="mesos", dependencies=[], - source_file_regexes=["mesos/"], + source_file_regexes=["resource-managers/mesos/"], build_profile_flags=["-Pmesos"], sbt_test_goals=["mesos/test"] ) http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/pom.xml ---------------------------------------------------------------------- diff --git a/mesos/pom.xml b/mesos/pom.xml deleted file mode 100644 index 60f2359..0000000 --- a/mesos/pom.xml +++ /dev/null @@ -1,109 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.spark</groupId> - <artifactId>spark-parent_2.11</artifactId> - <version>2.2.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <artifactId>spark-mesos_2.11</artifactId> - <packaging>jar</packaging> - <name>Spark Project Mesos</name> - <properties> - <sbt.project.name>mesos</sbt.project.name> - <mesos.version>1.0.0</mesos.version> - <mesos.classifier>shaded-protobuf</mesos.classifier> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.mesos</groupId> - <artifactId>mesos</artifactId> - <version>${mesos.version}</version> - <classifier>${mesos.classifier}</classifier> - <exclusions> - <exclusion> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - <scope>test</scope> - </dependency> - - <!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive --> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-server</artifactId> - </dependency> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-plus</artifactId> - </dependency> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-util</artifactId> - </dependency> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-http</artifactId> - </dependency> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-servlet</artifactId> - </dependency> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-servlets</artifactId> - </dependency> - <!-- End of shaded deps. --> - - </dependencies> - - - <build> - <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> - <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager ---------------------------------------------------------------------- diff --git a/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager deleted file mode 100644 index 12b6d5b..0000000 --- a/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager +++ /dev/null @@ -1 +0,0 @@ -org.apache.spark.scheduler.cluster.mesos.MesosClusterManager http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala deleted file mode 100644 index 792ade8..0000000 --- a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos - -import java.util.concurrent.CountDownLatch - -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.deploy.mesos.config._ -import org.apache.spark.deploy.mesos.ui.MesosClusterUI -import org.apache.spark.deploy.rest.mesos.MesosRestServer -import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.cluster.mesos._ -import org.apache.spark.util.{CommandLineUtils, ShutdownHookManager, Utils} - -/* - * A dispatcher that is responsible for managing and launching drivers, and is intended to be - * used for Mesos cluster mode. The dispatcher is a long-running process started by the user in - * the cluster independently of Spark applications. - * It contains a [[MesosRestServer]] that listens for requests to submit drivers and a - * [[MesosClusterScheduler]] that processes these requests by negotiating with the Mesos master - * for resources. - * - * A typical new driver lifecycle is the following: - * - Driver submitted via spark-submit talking to the [[MesosRestServer]] - * - [[MesosRestServer]] queues the driver request to [[MesosClusterScheduler]] - * - [[MesosClusterScheduler]] gets resource offers and launches the drivers that are in queue - * - * This dispatcher supports both Mesos fine-grain or coarse-grain mode as the mode is configurable - * per driver launched. - * This class is needed since Mesos doesn't manage frameworks, so the dispatcher acts as - * a daemon to launch drivers as Mesos frameworks upon request. The dispatcher is also started and - * stopped by sbin/start-mesos-dispatcher and sbin/stop-mesos-dispatcher respectively. - */ -private[mesos] class MesosClusterDispatcher( - args: MesosClusterDispatcherArguments, - conf: SparkConf) - extends Logging { - - private val publicAddress = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(args.host) - private val recoveryMode = conf.get(RECOVERY_MODE).toUpperCase() - logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode) - - private val engineFactory = recoveryMode match { - case "NONE" => new BlackHoleMesosClusterPersistenceEngineFactory - case "ZOOKEEPER" => new ZookeeperMesosClusterPersistenceEngineFactory(conf) - case _ => throw new IllegalArgumentException("Unsupported recovery mode: " + recoveryMode) - } - - private val scheduler = new MesosClusterScheduler(engineFactory, conf) - - private val server = new MesosRestServer(args.host, args.port, conf, scheduler) - private val webUi = new MesosClusterUI( - new SecurityManager(conf), - args.webUiPort, - conf, - publicAddress, - scheduler) - - private val shutdownLatch = new CountDownLatch(1) - - def start(): Unit = { - webUi.bind() - scheduler.frameworkUrl = conf.get(DISPATCHER_WEBUI_URL).getOrElse(webUi.activeWebUiUrl) - scheduler.start() - server.start() - } - - def awaitShutdown(): Unit = { - shutdownLatch.await() - } - - def stop(): Unit = { - webUi.stop() - server.stop() - scheduler.stop() - shutdownLatch.countDown() - } -} - -private[mesos] object MesosClusterDispatcher - extends Logging - with CommandLineUtils { - - override def main(args: Array[String]) { - Utils.initDaemon(log) - val conf = new SparkConf - val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf) - conf.setMaster(dispatcherArgs.masterUrl) - conf.setAppName(dispatcherArgs.name) - dispatcherArgs.zookeeperUrl.foreach { z => - conf.set(RECOVERY_MODE, "ZOOKEEPER") - conf.set(ZOOKEEPER_URL, z) - } - val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf) - dispatcher.start() - logDebug("Adding shutdown hook") // force eager creation of logger - ShutdownHookManager.addShutdownHook { () => - logInfo("Shutdown hook is shutting down dispatcher") - dispatcher.stop() - dispatcher.awaitShutdown() - } - dispatcher.awaitShutdown() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala deleted file mode 100644 index ef08502..0000000 --- a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos - -import scala.annotation.tailrec -import scala.collection.mutable - -import org.apache.spark.util.{IntParam, Utils} -import org.apache.spark.SparkConf - -private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: SparkConf) { - var host: String = Utils.localHostName() - var port: Int = 7077 - var name: String = "Spark Cluster" - var webUiPort: Int = 8081 - var verbose: Boolean = false - var masterUrl: String = _ - var zookeeperUrl: Option[String] = None - var propertiesFile: String = _ - val confProperties: mutable.HashMap[String, String] = - new mutable.HashMap[String, String]() - - parse(args.toList) - - // scalastyle:on println - propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile) - Utils.updateSparkConfigFromProperties(conf, confProperties) - - // scalastyle:off println - if (verbose) { - MesosClusterDispatcher.printStream.println(s"Using host: $host") - MesosClusterDispatcher.printStream.println(s"Using port: $port") - MesosClusterDispatcher.printStream.println(s"Using webUiPort: $webUiPort") - MesosClusterDispatcher.printStream.println(s"Framework Name: $name") - - Option(propertiesFile).foreach { file => - MesosClusterDispatcher.printStream.println(s"Using properties file: $file") - } - - MesosClusterDispatcher.printStream.println(s"Spark Config properties set:") - conf.getAll.foreach(println) - } - - @tailrec - private def parse(args: List[String]): Unit = args match { - case ("--host" | "-h") :: value :: tail => - Utils.checkHost(value, "Please use hostname " + value) - host = value - parse(tail) - - case ("--port" | "-p") :: IntParam(value) :: tail => - port = value - parse(tail) - - case ("--webui-port") :: IntParam(value) :: tail => - webUiPort = value - parse(tail) - - case ("--zk" | "-z") :: value :: tail => - zookeeperUrl = Some(value) - parse(tail) - - case ("--master" | "-m") :: value :: tail => - if (!value.startsWith("mesos://")) { - // scalastyle:off println - MesosClusterDispatcher.printStream - .println("Cluster dispatcher only supports mesos (uri begins with mesos://)") - // scalastyle:on println - MesosClusterDispatcher.exitFn(1) - } - masterUrl = value.stripPrefix("mesos://") - parse(tail) - - case ("--name") :: value :: tail => - name = value - parse(tail) - - case ("--properties-file") :: value :: tail => - propertiesFile = value - parse(tail) - - case ("--conf") :: value :: tail => - val pair = MesosClusterDispatcher. - parseSparkConfProperty(value) - confProperties(pair._1) = pair._2 - parse(tail) - - case ("--help") :: tail => - printUsageAndExit(0) - - case ("--verbose") :: tail => - verbose = true - parse(tail) - - case Nil => - if (Option(masterUrl).isEmpty) { - // scalastyle:off println - MesosClusterDispatcher.printStream.println("--master is required") - // scalastyle:on println - printUsageAndExit(1) - } - - case value => - // scalastyle:off println - MesosClusterDispatcher.printStream.println(s"Unrecognized option: '${value.head}'") - // scalastyle:on println - printUsageAndExit(1) - } - - private def printUsageAndExit(exitCode: Int): Unit = { - val outStream = MesosClusterDispatcher.printStream - - // scalastyle:off println - outStream.println( - "Usage: MesosClusterDispatcher [options]\n" + - "\n" + - "Options:\n" + - " -h HOST, --host HOST Hostname to listen on\n" + - " --help Show this help message and exit.\n" + - " --verbose, Print additional debug output.\n" + - " -p PORT, --port PORT Port to listen on (default: 7077)\n" + - " --webui-port WEBUI_PORT WebUI Port to listen on (default: 8081)\n" + - " --name NAME Framework name to show in Mesos UI\n" + - " -m --master MASTER URI for connecting to Mesos master\n" + - " -z --zk ZOOKEEPER Comma delimited URLs for connecting to \n" + - " Zookeeper for persistence\n" + - " --properties-file FILE Path to a custom Spark properties file.\n" + - " Default is conf/spark-defaults.conf \n" + - " --conf PROP=VALUE Arbitrary Spark configuration property.\n" + - " Takes precedence over defined properties in properties-file.") - // scalastyle:on println - MesosClusterDispatcher.exitFn(exitCode) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala deleted file mode 100644 index d4c7022..0000000 --- a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos - -import java.util.Date - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.Command -import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState - -/** - * Describes a Spark driver that is submitted from the - * [[org.apache.spark.deploy.rest.mesos.MesosRestServer]], to be launched by - * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]]. - * @param jarUrl URL to the application jar - * @param mem Amount of memory for the driver - * @param cores Number of cores for the driver - * @param supervise Supervise the driver for long running app - * @param command The command to launch the driver. - * @param schedulerProperties Extra properties to pass the Mesos scheduler - */ -private[spark] class MesosDriverDescription( - val name: String, - val jarUrl: String, - val mem: Int, - val cores: Double, - val supervise: Boolean, - val command: Command, - schedulerProperties: Map[String, String], - val submissionId: String, - val submissionDate: Date, - val retryState: Option[MesosClusterRetryState] = None) - extends Serializable { - - val conf = new SparkConf(false) - schedulerProperties.foreach {case (k, v) => conf.set(k, v)} - - def copy( - name: String = name, - jarUrl: String = jarUrl, - mem: Int = mem, - cores: Double = cores, - supervise: Boolean = supervise, - command: Command = command, - schedulerProperties: SparkConf = conf, - submissionId: String = submissionId, - submissionDate: Date = submissionDate, - retryState: Option[MesosClusterRetryState] = retryState): MesosDriverDescription = { - - new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, conf.getAll.toMap, - submissionId, submissionDate, retryState) - } - - override def toString: String = s"MesosDriverDescription (${command.mainClass})" -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala deleted file mode 100644 index 859aa83..0000000 --- a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos - -import java.nio.ByteBuffer -import java.util.concurrent.{ConcurrentHashMap, TimeUnit} - -import scala.collection.JavaConverters._ - -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.deploy.ExternalShuffleService -import org.apache.spark.deploy.mesos.config._ -import org.apache.spark.internal.Logging -import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} -import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler -import org.apache.spark.network.shuffle.protocol.BlockTransferMessage -import org.apache.spark.network.shuffle.protocol.mesos.{RegisterDriver, ShuffleServiceHeartbeat} -import org.apache.spark.network.util.TransportConf -import org.apache.spark.util.ThreadUtils - -/** - * An RPC endpoint that receives registration requests from Spark drivers running on Mesos. - * It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]]. - */ -private[mesos] class MesosExternalShuffleBlockHandler( - transportConf: TransportConf, - cleanerIntervalS: Long) - extends ExternalShuffleBlockHandler(transportConf, null) with Logging { - - ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher") - .scheduleAtFixedRate(new CleanerThread(), 0, cleanerIntervalS, TimeUnit.SECONDS) - - // Stores a map of app id to app state (timeout value and last heartbeat) - private val connectedApps = new ConcurrentHashMap[String, AppState]() - - protected override def handleMessage( - message: BlockTransferMessage, - client: TransportClient, - callback: RpcResponseCallback): Unit = { - message match { - case RegisterDriverParam(appId, appState) => - val address = client.getSocketAddress - val timeout = appState.heartbeatTimeout - logInfo(s"Received registration request from app $appId (remote address $address, " + - s"heartbeat timeout $timeout ms).") - if (connectedApps.containsKey(appId)) { - logWarning(s"Received a registration request from app $appId, but it was already " + - s"registered") - } - connectedApps.put(appId, appState) - callback.onSuccess(ByteBuffer.allocate(0)) - case Heartbeat(appId) => - val address = client.getSocketAddress - Option(connectedApps.get(appId)) match { - case Some(existingAppState) => - logTrace(s"Received ShuffleServiceHeartbeat from app '$appId' (remote " + - s"address $address).") - existingAppState.lastHeartbeat = System.nanoTime() - case None => - logWarning(s"Received ShuffleServiceHeartbeat from an unknown app (remote " + - s"address $address, appId '$appId').") - } - case _ => super.handleMessage(message, client, callback) - } - } - - /** An extractor object for matching [[RegisterDriver]] message. */ - private object RegisterDriverParam { - def unapply(r: RegisterDriver): Option[(String, AppState)] = - Some((r.getAppId, new AppState(r.getHeartbeatTimeoutMs, System.nanoTime()))) - } - - private object Heartbeat { - def unapply(h: ShuffleServiceHeartbeat): Option[String] = Some(h.getAppId) - } - - private class AppState(val heartbeatTimeout: Long, @volatile var lastHeartbeat: Long) - - private class CleanerThread extends Runnable { - override def run(): Unit = { - val now = System.nanoTime() - connectedApps.asScala.foreach { case (appId, appState) => - if (now - appState.lastHeartbeat > appState.heartbeatTimeout * 1000 * 1000) { - logInfo(s"Application $appId timed out. Removing shuffle files.") - connectedApps.remove(appId) - applicationRemoved(appId, true) - } - } - } - } -} - -/** - * A wrapper of [[ExternalShuffleService]] that provides an additional endpoint for drivers - * to associate with. This allows the shuffle service to detect when a driver is terminated - * and can clean up the associated shuffle files. - */ -private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManager: SecurityManager) - extends ExternalShuffleService(conf, securityManager) { - - protected override def newShuffleBlockHandler( - conf: TransportConf): ExternalShuffleBlockHandler = { - val cleanerIntervalS = this.conf.get(SHUFFLE_CLEANER_INTERVAL_S) - new MesosExternalShuffleBlockHandler(conf, cleanerIntervalS) - } -} - -private[spark] object MesosExternalShuffleService extends Logging { - - def main(args: Array[String]): Unit = { - ExternalShuffleService.main(args, - (conf: SparkConf, sm: SecurityManager) => new MesosExternalShuffleService(conf, sm)) - } -} - - http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala deleted file mode 100644 index 19e2533..0000000 --- a/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos - -import java.util.concurrent.TimeUnit - -import org.apache.spark.internal.config.ConfigBuilder - -package object config { - - /* Common app configuration. */ - - private[spark] val SHUFFLE_CLEANER_INTERVAL_S = - ConfigBuilder("spark.shuffle.cleaner.interval") - .timeConf(TimeUnit.SECONDS) - .createWithDefaultString("30s") - - private[spark] val RECOVERY_MODE = - ConfigBuilder("spark.deploy.recoveryMode") - .stringConf - .createWithDefault("NONE") - - private[spark] val DISPATCHER_WEBUI_URL = - ConfigBuilder("spark.mesos.dispatcher.webui.url") - .doc("Set the Spark Mesos dispatcher webui_url for interacting with the " + - "framework. If unset it will point to Spark's internal web UI.") - .stringConf - .createOptional - - private[spark] val ZOOKEEPER_URL = - ConfigBuilder("spark.deploy.zookeeper.url") - .doc("When `spark.deploy.recoveryMode` is set to ZOOKEEPER, this " + - "configuration is used to set the zookeeper URL to connect to.") - .stringConf - .createOptional - - private[spark] val HISTORY_SERVER_URL = - ConfigBuilder("spark.mesos.dispatcher.historyServer.url") - .doc("Set the URL of the history server. The dispatcher will then " + - "link each driver to its entry in the history server.") - .stringConf - .createOptional - -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala deleted file mode 100644 index cd98110..0000000 --- a/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos.ui - -import javax.servlet.http.HttpServletRequest - -import scala.xml.Node - -import org.apache.spark.deploy.Command -import org.apache.spark.deploy.mesos.MesosDriverDescription -import org.apache.spark.scheduler.cluster.mesos.{MesosClusterRetryState, MesosClusterSubmissionState} -import org.apache.spark.ui.{UIUtils, WebUIPage} - -private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") { - - override def render(request: HttpServletRequest): Seq[Node] = { - val driverId = request.getParameter("id") - require(driverId != null && driverId.nonEmpty, "Missing id parameter") - - val state = parent.scheduler.getDriverState(driverId) - if (state.isEmpty) { - val content = - <div> - <p>Cannot find driver {driverId}</p> - </div> - return UIUtils.basicSparkPage(content, s"Details for Job $driverId") - } - val driverState = state.get - val driverHeaders = Seq("Driver property", "Value") - val schedulerHeaders = Seq("Scheduler property", "Value") - val commandEnvHeaders = Seq("Command environment variable", "Value") - val launchedHeaders = Seq("Launched property", "Value") - val commandHeaders = Seq("Command property", "Value") - val retryHeaders = Seq("Last failed status", "Next retry time", "Retry count") - val driverDescription = Iterable.apply(driverState.description) - val submissionState = Iterable.apply(driverState.submissionState) - val command = Iterable.apply(driverState.description.command) - val schedulerProperties = Iterable.apply(driverState.description.conf.getAll.toMap) - val commandEnv = Iterable.apply(driverState.description.command.environment) - val driverTable = - UIUtils.listingTable(driverHeaders, driverRow, driverDescription) - val commandTable = - UIUtils.listingTable(commandHeaders, commandRow, command) - val commandEnvTable = - UIUtils.listingTable(commandEnvHeaders, propertiesRow, commandEnv) - val schedulerTable = - UIUtils.listingTable(schedulerHeaders, propertiesRow, schedulerProperties) - val launchedTable = - UIUtils.listingTable(launchedHeaders, launchedRow, submissionState) - val retryTable = - UIUtils.listingTable( - retryHeaders, retryRow, Iterable.apply(driverState.description.retryState)) - val content = - <p>Driver state information for driver id {driverId}</p> - <a href={UIUtils.prependBaseUri("/")}>Back to Drivers</a> - <div class="row-fluid"> - <div class="span12"> - <h4>Driver state: {driverState.state}</h4> - <h4>Driver properties</h4> - {driverTable} - <h4>Driver command</h4> - {commandTable} - <h4>Driver command environment</h4> - {commandEnvTable} - <h4>Scheduler properties</h4> - {schedulerTable} - <h4>Launched state</h4> - {launchedTable} - <h4>Retry state</h4> - {retryTable} - </div> - </div>; - - UIUtils.basicSparkPage(content, s"Details for Job $driverId") - } - - private def launchedRow(submissionState: Option[MesosClusterSubmissionState]): Seq[Node] = { - submissionState.map { state => - <tr> - <td>Mesos Slave ID</td> - <td>{state.slaveId.getValue}</td> - </tr> - <tr> - <td>Mesos Task ID</td> - <td>{state.taskId.getValue}</td> - </tr> - <tr> - <td>Launch Time</td> - <td>{state.startDate}</td> - </tr> - <tr> - <td>Finish Time</td> - <td>{state.finishDate.map(_.toString).getOrElse("")}</td> - </tr> - <tr> - <td>Last Task Status</td> - <td>{state.mesosTaskStatus.map(_.toString).getOrElse("")}</td> - </tr> - }.getOrElse(Seq[Node]()) - } - - private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = { - properties.map { case (k, v) => - <tr> - <td>{k}</td><td>{v}</td> - </tr> - }.toSeq - } - - private def commandRow(command: Command): Seq[Node] = { - <tr> - <td>Main class</td><td>{command.mainClass}</td> - </tr> - <tr> - <td>Arguments</td><td>{command.arguments.mkString(" ")}</td> - </tr> - <tr> - <td>Class path entries</td><td>{command.classPathEntries.mkString(" ")}</td> - </tr> - <tr> - <td>Java options</td><td>{command.javaOpts.mkString((" "))}</td> - </tr> - <tr> - <td>Library path entries</td><td>{command.libraryPathEntries.mkString((" "))}</td> - </tr> - } - - private def driverRow(driver: MesosDriverDescription): Seq[Node] = { - <tr> - <td>Name</td><td>{driver.name}</td> - </tr> - <tr> - <td>Id</td><td>{driver.submissionId}</td> - </tr> - <tr> - <td>Cores</td><td>{driver.cores}</td> - </tr> - <tr> - <td>Memory</td><td>{driver.mem}</td> - </tr> - <tr> - <td>Submitted</td><td>{driver.submissionDate}</td> - </tr> - <tr> - <td>Supervise</td><td>{driver.supervise}</td> - </tr> - } - - private def retryRow(retryState: Option[MesosClusterRetryState]): Seq[Node] = { - retryState.map { state => - <tr> - <td> - {state.lastFailureStatus} - </td> - <td> - {state.nextRetry} - </td> - <td> - {state.retries} - </td> - </tr> - }.getOrElse(Seq[Node]()) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala deleted file mode 100644 index 13ba7d3..0000000 --- a/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos.ui - -import javax.servlet.http.HttpServletRequest - -import scala.xml.Node - -import org.apache.mesos.Protos.TaskStatus - -import org.apache.spark.deploy.mesos.config._ -import org.apache.spark.deploy.mesos.MesosDriverDescription -import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState -import org.apache.spark.ui.{UIUtils, WebUIPage} - -private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") { - private val historyServerURL = parent.conf.get(HISTORY_SERVER_URL) - - def render(request: HttpServletRequest): Seq[Node] = { - val state = parent.scheduler.getSchedulerState() - - val driverHeader = Seq("Driver ID") - val historyHeader = historyServerURL.map(url => Seq("History")).getOrElse(Nil) - val submissionHeader = Seq("Submit Date", "Main Class", "Driver Resources") - - val queuedHeaders = driverHeader ++ submissionHeader - val driverHeaders = driverHeader ++ historyHeader ++ submissionHeader ++ - Seq("Start Date", "Mesos Slave ID", "State") - val retryHeaders = Seq("Driver ID", "Submit Date", "Description") ++ - Seq("Last Failed Status", "Next Retry Time", "Attempt Count") - val queuedTable = UIUtils.listingTable(queuedHeaders, queuedRow, state.queuedDrivers) - val launchedTable = UIUtils.listingTable(driverHeaders, driverRow, state.launchedDrivers) - val finishedTable = UIUtils.listingTable(driverHeaders, driverRow, state.finishedDrivers) - val retryTable = UIUtils.listingTable(retryHeaders, retryRow, state.pendingRetryDrivers) - val content = - <p>Mesos Framework ID: {state.frameworkId}</p> - <div class="row-fluid"> - <div class="span12"> - <h4>Queued Drivers:</h4> - {queuedTable} - <h4>Launched Drivers:</h4> - {launchedTable} - <h4>Finished Drivers:</h4> - {finishedTable} - <h4>Supervise drivers waiting for retry:</h4> - {retryTable} - </div> - </div>; - UIUtils.basicSparkPage(content, "Spark Drivers for Mesos cluster") - } - - private def queuedRow(submission: MesosDriverDescription): Seq[Node] = { - val id = submission.submissionId - <tr> - <td><a href={s"driver?id=$id"}>{id}</a></td> - <td>{submission.submissionDate}</td> - <td>{submission.command.mainClass}</td> - <td>cpus: {submission.cores}, mem: {submission.mem}</td> - </tr> - } - - private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = { - val id = state.driverDescription.submissionId - - val historyCol = if (historyServerURL.isDefined) { - <td> - <a href={s"${historyServerURL.get}/history/${state.frameworkId}"}> - {state.frameworkId} - </a> - </td> - } else Nil - - <tr> - <td><a href={s"driver?id=$id"}>{id}</a></td> - {historyCol} - <td>{state.driverDescription.submissionDate}</td> - <td>{state.driverDescription.command.mainClass}</td> - <td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td> - <td>{state.startDate}</td> - <td>{state.slaveId.getValue}</td> - <td>{stateString(state.mesosTaskStatus)}</td> - </tr> - } - - private def retryRow(submission: MesosDriverDescription): Seq[Node] = { - val id = submission.submissionId - <tr> - <td><a href={s"driver?id=$id"}>{id}</a></td> - <td>{submission.submissionDate}</td> - <td>{submission.command.mainClass}</td> - <td>{submission.retryState.get.lastFailureStatus}</td> - <td>{submission.retryState.get.nextRetry}</td> - <td>{submission.retryState.get.retries}</td> - </tr> - } - - private def stateString(status: Option[TaskStatus]): String = { - if (status.isEmpty) { - return "" - } - val sb = new StringBuilder - val s = status.get - sb.append(s"State: ${s.getState}") - if (status.get.hasMessage) { - sb.append(s", Message: ${s.getMessage}") - } - if (status.get.hasHealthy) { - sb.append(s", Healthy: ${s.getHealthy}") - } - if (status.get.hasSource) { - sb.append(s", Source: ${s.getSource}") - } - if (status.get.hasReason) { - sb.append(s", Reason: ${s.getReason}") - } - if (status.get.hasTimestamp) { - sb.append(s", Time: ${s.getTimestamp}") - } - sb.toString() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala deleted file mode 100644 index 6049789..0000000 --- a/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos.ui - -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler -import org.apache.spark.ui.{SparkUI, WebUI} -import org.apache.spark.ui.JettyUtils._ - -/** - * UI that displays driver results from the [[org.apache.spark.deploy.mesos.MesosClusterDispatcher]] - */ -private[spark] class MesosClusterUI( - securityManager: SecurityManager, - port: Int, - val conf: SparkConf, - dispatcherPublicAddress: String, - val scheduler: MesosClusterScheduler) - extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) { - - initialize() - - def activeWebUiUrl: String = "http://" + dispatcherPublicAddress + ":" + boundPort - - override def initialize() { - attachPage(new MesosClusterPage(this)) - attachPage(new DriverPage(this)) - attachHandler(createStaticHandler(MesosClusterUI.STATIC_RESOURCE_DIR, "/static")) - } -} - -private object MesosClusterUI { - val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala deleted file mode 100644 index ff60b88..0000000 --- a/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.rest.mesos - -import java.io.File -import java.text.SimpleDateFormat -import java.util.{Date, Locale} -import java.util.concurrent.atomic.AtomicLong -import javax.servlet.http.HttpServletResponse - -import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} -import org.apache.spark.deploy.Command -import org.apache.spark.deploy.mesos.MesosDriverDescription -import org.apache.spark.deploy.rest._ -import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler -import org.apache.spark.util.Utils - -/** - * A server that responds to requests submitted by the [[RestSubmissionClient]]. - * All requests are forwarded to - * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]]. - * This is intended to be used in Mesos cluster mode only. - * For more details about the REST submission please refer to [[RestSubmissionServer]] javadocs. - */ -private[spark] class MesosRestServer( - host: String, - requestedPort: Int, - masterConf: SparkConf, - scheduler: MesosClusterScheduler) - extends RestSubmissionServer(host, requestedPort, masterConf) { - - protected override val submitRequestServlet = - new MesosSubmitRequestServlet(scheduler, masterConf) - protected override val killRequestServlet = - new MesosKillRequestServlet(scheduler, masterConf) - protected override val statusRequestServlet = - new MesosStatusRequestServlet(scheduler, masterConf) -} - -private[mesos] class MesosSubmitRequestServlet( - scheduler: MesosClusterScheduler, - conf: SparkConf) - extends SubmitRequestServlet { - - private val DEFAULT_SUPERVISE = false - private val DEFAULT_MEMORY = Utils.DEFAULT_DRIVER_MEM_MB // mb - private val DEFAULT_CORES = 1.0 - - private val nextDriverNumber = new AtomicLong(0) - // For application IDs - private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) - private def newDriverId(submitDate: Date): String = - f"driver-${createDateFormat.format(submitDate)}-${nextDriverNumber.incrementAndGet()}%04d" - - /** - * Build a driver description from the fields specified in the submit request. - * - * This involves constructing a command that launches a mesos framework for the job. - * This does not currently consider fields used by python applications since python - * is not supported in mesos cluster mode yet. - */ - private def buildDriverDescription(request: CreateSubmissionRequest): MesosDriverDescription = { - // Required fields, including the main class because python is not yet supported - val appResource = Option(request.appResource).getOrElse { - throw new SubmitRestMissingFieldException("Application jar is missing.") - } - val mainClass = Option(request.mainClass).getOrElse { - throw new SubmitRestMissingFieldException("Main class is missing.") - } - - // Optional fields - val sparkProperties = request.sparkProperties - val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions") - val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath") - val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath") - val superviseDriver = sparkProperties.get("spark.driver.supervise") - val driverMemory = sparkProperties.get("spark.driver.memory") - val driverCores = sparkProperties.get("spark.driver.cores") - val appArgs = request.appArgs - val environmentVariables = request.environmentVariables - val name = request.sparkProperties.getOrElse("spark.app.name", mainClass) - - // Construct driver description - val conf = new SparkConf(false).setAll(sparkProperties) - val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator)) - val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator)) - val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) - val sparkJavaOpts = Utils.sparkJavaOpts(conf) - val javaOpts = sparkJavaOpts ++ extraJavaOpts - val command = new Command( - mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts) - val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE) - val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY) - val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES) - val submitDate = new Date() - val submissionId = newDriverId(submitDate) - - new MesosDriverDescription( - name, appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, - command, request.sparkProperties, submissionId, submitDate) - } - - protected override def handleSubmit( - requestMessageJson: String, - requestMessage: SubmitRestProtocolMessage, - responseServlet: HttpServletResponse): SubmitRestProtocolResponse = { - requestMessage match { - case submitRequest: CreateSubmissionRequest => - val driverDescription = buildDriverDescription(submitRequest) - val s = scheduler.submitDriver(driverDescription) - s.serverSparkVersion = sparkVersion - val unknownFields = findUnknownFields(requestMessageJson, requestMessage) - if (unknownFields.nonEmpty) { - // If there are fields that the server does not know about, warn the client - s.unknownFields = unknownFields - } - s - case unexpected => - responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST) - handleError(s"Received message of unexpected type ${unexpected.messageType}.") - } - } -} - -private[mesos] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf) - extends KillRequestServlet { - protected override def handleKill(submissionId: String): KillSubmissionResponse = { - val k = scheduler.killDriver(submissionId) - k.serverSparkVersion = sparkVersion - k - } -} - -private[mesos] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf) - extends StatusRequestServlet { - protected override def handleStatus(submissionId: String): SubmissionStatusResponse = { - val d = scheduler.getDriverStatus(submissionId) - d.serverSparkVersion = sparkVersion - d - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala deleted file mode 100644 index ee9149c..0000000 --- a/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.executor - -import java.nio.ByteBuffer - -import scala.collection.JavaConverters._ - -import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver} -import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} -import org.apache.mesos.protobuf.ByteString - -import org.apache.spark.{SparkConf, SparkEnv, TaskState} -import org.apache.spark.TaskState -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.cluster.mesos.{MesosSchedulerUtils, MesosTaskLaunchData} -import org.apache.spark.util.Utils - -private[spark] class MesosExecutorBackend - extends MesosExecutor - with MesosSchedulerUtils // TODO: fix - with ExecutorBackend - with Logging { - - var executor: Executor = null - var driver: ExecutorDriver = null - - override def statusUpdate(taskId: Long, state: TaskState.TaskState, data: ByteBuffer) { - val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build() - driver.sendStatusUpdate(MesosTaskStatus.newBuilder() - .setTaskId(mesosTaskId) - .setState(taskStateToMesos(state)) - .setData(ByteString.copyFrom(data)) - .build()) - } - - override def registered( - driver: ExecutorDriver, - executorInfo: ExecutorInfo, - frameworkInfo: FrameworkInfo, - slaveInfo: SlaveInfo) { - - // Get num cores for this task from ExecutorInfo, created in MesosSchedulerBackend. - val cpusPerTask = executorInfo.getResourcesList.asScala - .find(_.getName == "cpus") - .map(_.getScalar.getValue.toInt) - .getOrElse(0) - val executorId = executorInfo.getExecutorId.getValue - - logInfo(s"Registered with Mesos as executor ID $executorId with $cpusPerTask cpus") - this.driver = driver - // Set a context class loader to be picked up by the serializer. Without this call - // the serializer would default to the null class loader, and fail to find Spark classes - // See SPARK-10986. - Thread.currentThread().setContextClassLoader(this.getClass.getClassLoader) - - val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++ - Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue)) - val conf = new SparkConf(loadDefaults = true).setAll(properties) - val port = conf.getInt("spark.executor.port", 0) - val env = SparkEnv.createExecutorEnv( - conf, executorId, slaveInfo.getHostname, port, cpusPerTask, None, isLocal = false) - - executor = new Executor( - executorId, - slaveInfo.getHostname, - env) - } - - override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { - val taskId = taskInfo.getTaskId.getValue.toLong - val taskData = MesosTaskLaunchData.fromByteString(taskInfo.getData) - if (executor == null) { - logError("Received launchTask but executor was null") - } else { - SparkHadoopUtil.get.runAsSparkUser { () => - executor.launchTask(this, taskId = taskId, attemptNumber = taskData.attemptNumber, - taskInfo.getName, taskData.serializedTask) - } - } - } - - override def error(d: ExecutorDriver, message: String) { - logError("Error from Mesos: " + message) - } - - override def killTask(d: ExecutorDriver, t: TaskID) { - if (executor == null) { - logError("Received KillTask but executor was null") - } else { - // TODO: Determine the 'interruptOnCancel' property set for the given job. - executor.killTask(t.getValue.toLong, interruptThread = false) - } - } - - override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {} - - override def disconnected(d: ExecutorDriver) {} - - override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {} - - override def shutdown(d: ExecutorDriver) {} -} - -/** - * Entry point for Mesos executor. - */ -private[spark] object MesosExecutorBackend extends Logging { - def main(args: Array[String]) { - Utils.initDaemon(log) - // Create a new Executor and start it running - val runner = new MesosExecutorBackend() - new MesosExecutorDriver(runner).run() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala deleted file mode 100644 index ed29b34..0000000 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import org.apache.spark.{SparkContext, SparkException} -import org.apache.spark.internal.config._ -import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} - -/** - * Cluster Manager for creation of Yarn scheduler and backend - */ -private[spark] class MesosClusterManager extends ExternalClusterManager { - private val MESOS_REGEX = """mesos://(.*)""".r - - override def canCreate(masterURL: String): Boolean = { - masterURL.startsWith("mesos") - } - - override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { - new TaskSchedulerImpl(sc) - } - - override def createSchedulerBackend(sc: SparkContext, - masterURL: String, - scheduler: TaskScheduler): SchedulerBackend = { - require(!sc.conf.get(IO_ENCRYPTION_ENABLED), - "I/O encryption is currently not supported in Mesos.") - - val mesosUrl = MESOS_REGEX.findFirstMatchIn(masterURL).get.group(1) - val coarse = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true) - if (coarse) { - new MesosCoarseGrainedSchedulerBackend( - scheduler.asInstanceOf[TaskSchedulerImpl], - sc, - mesosUrl, - sc.env.securityManager) - } else { - new MesosFineGrainedSchedulerBackend( - scheduler.asInstanceOf[TaskSchedulerImpl], - sc, - mesosUrl) - } - } - - override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { - scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) - } -} - http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala deleted file mode 100644 index 61ab3e8..0000000 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import scala.collection.JavaConverters._ - -import org.apache.curator.framework.CuratorFramework -import org.apache.zookeeper.CreateMode -import org.apache.zookeeper.KeeperException.NoNodeException - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.SparkCuratorUtil -import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils - -/** - * Persistence engine factory that is responsible for creating new persistence engines - * to store Mesos cluster mode state. - */ -private[spark] abstract class MesosClusterPersistenceEngineFactory(conf: SparkConf) { - def createEngine(path: String): MesosClusterPersistenceEngine -} - -/** - * Mesos cluster persistence engine is responsible for persisting Mesos cluster mode - * specific state, so that on failover all the state can be recovered and the scheduler - * can resume managing the drivers. - */ -private[spark] trait MesosClusterPersistenceEngine { - def persist(name: String, obj: Object): Unit - def expunge(name: String): Unit - def fetch[T](name: String): Option[T] - def fetchAll[T](): Iterable[T] -} - -/** - * Zookeeper backed persistence engine factory. - * All Zk engines created from this factory shares the same Zookeeper client, so - * all of them reuses the same connection pool. - */ -private[spark] class ZookeeperMesosClusterPersistenceEngineFactory(conf: SparkConf) - extends MesosClusterPersistenceEngineFactory(conf) with Logging { - - lazy val zk = SparkCuratorUtil.newClient(conf) - - def createEngine(path: String): MesosClusterPersistenceEngine = { - new ZookeeperMesosClusterPersistenceEngine(path, zk, conf) - } -} - -/** - * Black hole persistence engine factory that creates black hole - * persistence engines, which stores nothing. - */ -private[spark] class BlackHoleMesosClusterPersistenceEngineFactory - extends MesosClusterPersistenceEngineFactory(null) { - def createEngine(path: String): MesosClusterPersistenceEngine = { - new BlackHoleMesosClusterPersistenceEngine - } -} - -/** - * Black hole persistence engine that stores nothing. - */ -private[spark] class BlackHoleMesosClusterPersistenceEngine extends MesosClusterPersistenceEngine { - override def persist(name: String, obj: Object): Unit = {} - override def fetch[T](name: String): Option[T] = None - override def expunge(name: String): Unit = {} - override def fetchAll[T](): Iterable[T] = Iterable.empty[T] -} - -/** - * Zookeeper based Mesos cluster persistence engine, that stores cluster mode state - * into Zookeeper. Each engine object is operating under one folder in Zookeeper, but - * reuses a shared Zookeeper client. - */ -private[spark] class ZookeeperMesosClusterPersistenceEngine( - baseDir: String, - zk: CuratorFramework, - conf: SparkConf) - extends MesosClusterPersistenceEngine with Logging { - private val WORKING_DIR = - conf.get("spark.deploy.zookeeper.dir", "/spark_mesos_dispatcher") + "/" + baseDir - - SparkCuratorUtil.mkdir(zk, WORKING_DIR) - - def path(name: String): String = { - WORKING_DIR + "/" + name - } - - override def expunge(name: String): Unit = { - zk.delete().forPath(path(name)) - } - - override def persist(name: String, obj: Object): Unit = { - val serialized = Utils.serialize(obj) - val zkPath = path(name) - zk.create().withMode(CreateMode.PERSISTENT).forPath(zkPath, serialized) - } - - override def fetch[T](name: String): Option[T] = { - val zkPath = path(name) - - try { - val fileData = zk.getData().forPath(zkPath) - Some(Utils.deserialize[T](fileData)) - } catch { - case e: NoNodeException => None - case e: Exception => - logWarning("Exception while reading persisted file, deleting", e) - zk.delete().forPath(zkPath) - None - } - } - - override def fetchAll[T](): Iterable[T] = { - zk.getChildren.forPath(WORKING_DIR).asScala.flatMap(fetch[T]) - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
