This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 38f0307 [SPARK-26466][CORE] Use ConfigEntry for hardcoded configs for submit categories. 38f0307 is described below commit 38f030725c561979ca98b2a6cc7ca6c02a1f80ed Author: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com> AuthorDate: Wed Jan 16 20:57:21 2019 -0600 [SPARK-26466][CORE] Use ConfigEntry for hardcoded configs for submit categories. ## What changes were proposed in this pull request? The PR makes hardcoded configs below to use `ConfigEntry`. * spark.kryo * spark.kryoserializer * spark.serializer * spark.jars * spark.files * spark.submit * spark.deploy * spark.worker This patch doesn't change configs which are not relevant to SparkConf (e.g. system properties). ## How was this patch tested? Existing tests. Closes #23532 from HeartSaVioR/SPARK-26466-v2. Authored-by: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../main/scala/org/apache/spark/SparkConf.scala | 21 +++---- .../main/scala/org/apache/spark/SparkContext.scala | 4 +- .../src/main/scala/org/apache/spark/SparkEnv.scala | 9 ++- .../main/scala/org/apache/spark/api/r/RUtils.scala | 3 +- .../apache/spark/deploy/FaultToleranceTest.scala | 6 +- .../org/apache/spark/deploy/SparkCuratorUtil.scala | 3 +- .../org/apache/spark/deploy/SparkSubmit.scala | 29 +++++---- .../org/apache/spark/deploy/master/Master.scala | 48 +++++++-------- .../spark/deploy/master/RecoveryModeFactory.scala | 7 ++- .../master/ZooKeeperLeaderElectionAgent.scala | 5 +- .../deploy/master/ZooKeeperPersistenceEngine.scala | 15 ++--- .../apache/spark/deploy/worker/DriverRunner.scala | 6 +- .../org/apache/spark/deploy/worker/Worker.scala | 20 +++---- .../spark/deploy/worker/WorkerArguments.scala | 5 +- .../spark/deploy/worker/ui/WorkerWebUI.scala | 2 - .../org/apache/spark/internal/config/Deploy.scala | 68 ++++++++++++++++++++++ .../org/apache/spark/internal/config/Kryo.scala | 57 ++++++++++++++++++ .../org/apache/spark/internal/config/Worker.scala | 63 ++++++++++++++++++++ .../org/apache/spark/internal/config/package.scala | 31 ++++++++++ .../apache/spark/serializer/JavaSerializer.scala | 5 +- .../apache/spark/serializer/KryoSerializer.scala | 29 ++++----- .../main/scala/org/apache/spark/util/Utils.scala | 12 ++-- .../org/apache/spark/JobCancellationSuite.scala | 3 +- .../test/scala/org/apache/spark/ShuffleSuite.scala | 3 +- .../scala/org/apache/spark/SparkConfSuite.scala | 31 +++++----- .../spark/api/python/PythonBroadcastSuite.scala | 3 +- .../apache/spark/broadcast/BroadcastSuite.scala | 3 +- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 34 +++++------ .../apache/spark/deploy/master/MasterSuite.scala | 14 ++--- .../deploy/master/PersistenceEngineSuite.scala | 3 +- .../deploy/rest/SubmitRestProtocolSuite.scala | 3 +- .../apache/spark/deploy/worker/WorkerSuite.scala | 9 +-- .../apache/spark/scheduler/MapStatusSuite.scala | 2 +- .../serializer/GenericAvroSerializerSuite.scala | 3 +- .../apache/spark/serializer/KryoBenchmark.scala | 8 ++- .../spark/serializer/KryoSerializerBenchmark.scala | 8 ++- .../KryoSerializerDistributedSuite.scala | 4 +- .../KryoSerializerResizableOutputSuite.scala | 14 +++-- .../spark/serializer/KryoSerializerSuite.scala | 44 +++++++------- .../serializer/SerializerPropertiesSuite.scala | 3 +- .../serializer/UnsafeKryoSerializerSuite.scala | 6 +- .../spark/storage/FlatmapIteratorSuite.scala | 4 +- .../scala/org/apache/spark/util/UtilsSuite.scala | 3 +- .../collection/ExternalAppendOnlyMapSuite.scala | 4 +- .../util/collection/ExternalSorterSuite.scala | 7 ++- .../apache/spark/ml/attribute/AttributeSuite.scala | 3 +- .../apache/spark/ml/feature/InstanceSuite.scala | 3 +- .../spark/ml/feature/LabeledPointSuite.scala | 3 +- .../apache/spark/ml/tree/impl/TreePointSuite.scala | 3 +- .../spark/mllib/clustering/KMeansSuite.scala | 3 +- .../apache/spark/mllib/feature/Word2VecSuite.scala | 19 ++++-- .../apache/spark/mllib/linalg/MatricesSuite.scala | 3 +- .../apache/spark/mllib/linalg/VectorsSuite.scala | 3 +- .../spark/mllib/regression/LabeledPointSuite.scala | 3 +- .../distribution/MultivariateGaussianSuite.scala | 3 +- .../k8s/features/BasicDriverFeatureStep.scala | 11 ++-- .../k8s/features/DriverCommandFeatureStep.scala | 13 +++-- .../k8s/features/BasicDriverFeatureStepSuite.scala | 6 +- .../integrationtest/KubernetesTestComponents.scala | 3 +- .../deploy/mesos/MesosClusterDispatcher.scala | 1 + .../org/apache/spark/deploy/mesos/config.scala | 12 ---- .../mesos/MesosClusterPersistenceEngine.scala | 11 ++-- .../cluster/mesos/MesosClusterScheduler.scala | 18 +++--- .../org/apache/spark/deploy/yarn/Client.scala | 6 +- .../org/apache/spark/deploy/yarn/ClientSuite.scala | 3 +- .../spark/deploy/yarn/YarnClusterSuite.scala | 2 +- .../sql/DatasetSerializerRegistratorSuite.scala | 3 +- ...ExternalAppendOnlyUnsafeRowArrayBenchmark.scala | 4 +- .../sql/execution/joins/HashedRelationSuite.scala | 5 +- 69 files changed, 528 insertions(+), 280 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 22bcb81..b596be0 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -28,6 +28,7 @@ import org.apache.avro.{Schema, SchemaNormalization} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.History._ +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.Utils @@ -123,7 +124,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria /** Set JAR files to distribute to the cluster. */ def setJars(jars: Seq[String]): SparkConf = { for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor") - set("spark.jars", jars.filter(_ != null).mkString(",")) + set(JARS, jars.filter(_ != null)) } /** Set JAR files to distribute to the cluster. (Java-friendly version.) */ @@ -201,12 +202,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria */ def registerKryoClasses(classes: Array[Class[_]]): SparkConf = { val allClassNames = new LinkedHashSet[String]() - allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').map(_.trim) + allClassNames ++= get(KRYO_CLASSES_TO_REGISTER).map(_.trim) .filter(!_.isEmpty) allClassNames ++= classes.map(_.getName) - set("spark.kryo.classesToRegister", allClassNames.mkString(",")) - set("spark.serializer", classOf[KryoSerializer].getName) + set(KRYO_CLASSES_TO_REGISTER, allClassNames.toSeq) + set(SERIALIZER, classOf[KryoSerializer].getName) this } @@ -547,20 +548,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria case "yarn-cluster" => logWarning(warning) set("spark.master", "yarn") - set("spark.submit.deployMode", "cluster") + set(SUBMIT_DEPLOY_MODE, "cluster") case "yarn-client" => logWarning(warning) set("spark.master", "yarn") - set("spark.submit.deployMode", "client") + set(SUBMIT_DEPLOY_MODE, "client") case _ => // Any other unexpected master will be checked when creating scheduler backend. } } - if (contains("spark.submit.deployMode")) { - get("spark.submit.deployMode") match { + if (contains(SUBMIT_DEPLOY_MODE)) { + get(SUBMIT_DEPLOY_MODE) match { case "cluster" | "client" => - case e => throw new SparkException("spark.submit.deployMode can only be \"cluster\" or " + - "\"client\".") + case e => throw new SparkException(s"${SUBMIT_DEPLOY_MODE.key} can only be " + + "\"cluster\" or \"client\".") } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3bbf9f3..c9afc79 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -229,7 +229,7 @@ class SparkContext(config: SparkConf) extends Logging { def jars: Seq[String] = _jars def files: Seq[String] = _files def master: String = _conf.get("spark.master") - def deployMode: String = _conf.getOption("spark.submit.deployMode").getOrElse("client") + def deployMode: String = _conf.get(SUBMIT_DEPLOY_MODE) def appName: String = _conf.get("spark.app.name") private[spark] def isEventLogEnabled: Boolean = _conf.get(EVENT_LOG_ENABLED) @@ -2640,7 +2640,7 @@ object SparkContext extends Logging { case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads) case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads) case "yarn" => - if (conf != null && conf.getOption("spark.submit.deployMode").contains("cluster")) { + if (conf != null && conf.get(SUBMIT_DEPLOY_MODE) == "cluster") { conf.getInt(DRIVER_CORES.key, 0) } else { 0 diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index ba5ed8a..4d7542c 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -274,14 +274,13 @@ object SparkEnv extends Logging { } } - // Create an instance of the class named by the given SparkConf property, or defaultClassName + // Create an instance of the class named by the given SparkConf property // if the property is not set, possibly initializing it with our conf - def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = { - instantiateClass[T](conf.get(propertyName, defaultClassName)) + def instantiateClassFromConf[T](propertyName: ConfigEntry[String]): T = { + instantiateClass[T](conf.get(propertyName)) } - val serializer = instantiateClassFromConf[Serializer]( - "spark.serializer", "org.apache.spark.serializer.JavaSerializer") + val serializer = instantiateClassFromConf[Serializer](SERIALIZER) logDebug(s"Using serializer: ${serializer.getClass}") val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey) diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala index 9bf35af..6832223 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala @@ -23,6 +23,7 @@ import java.util.Arrays import org.apache.spark.{SparkEnv, SparkException} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.api.python.PythonUtils +import org.apache.spark.internal.config._ private[spark] object RUtils { // Local path where R binary packages built from R source code contained in the spark @@ -63,7 +64,7 @@ private[spark] object RUtils { (sys.props("spark.master"), sys.props("spark.submit.deployMode")) } else { val sparkConf = SparkEnv.get.conf - (sparkConf.get("spark.master"), sparkConf.get("spark.submit.deployMode", "client")) + (sparkConf.get("spark.master"), sparkConf.get(SUBMIT_DEPLOY_MODE)) } val isYarnCluster = master != null && master.contains("yarn") && deployMode == "cluster" diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index 0679bdf..a662430 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -60,7 +60,7 @@ import org.apache.spark.util.{ThreadUtils, Utils} private object FaultToleranceTest extends App with Logging { private val conf = new SparkConf() - private val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + private val zkDir = conf.get(config.Deploy.ZOOKEEPER_DIRECTORY).getOrElse("/spark") private val masters = ListBuffer[TestMasterInfo]() private val workers = ListBuffer[TestWorkerInfo]() @@ -87,8 +87,8 @@ private object FaultToleranceTest extends App with Logging { terminateCluster() // Clear ZK directories in between tests (for speed purposes) - SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/spark_leader") - SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/master_status") + SparkCuratorUtil.deleteRecursive(zk, zkDir + "/spark_leader") + SparkCuratorUtil.deleteRecursive(zk, zkDir + "/master_status") } test("sanity-basic") { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala index 8247110..8118c01 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala @@ -25,6 +25,7 @@ import org.apache.zookeeper.KeeperException import org.apache.spark.SparkConf import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL private[spark] object SparkCuratorUtil extends Logging { @@ -35,7 +36,7 @@ private[spark] object SparkCuratorUtil extends Logging { def newClient( conf: SparkConf, - zkUrlConf: String = "spark.deploy.zookeeper.url"): CuratorFramework = { + zkUrlConf: String = ZOOKEEPER_URL.key): CuratorFramework = { val ZK_URL = conf.get(zkUrlConf) val zk = CuratorFrameworkFactory.newClient(ZK_URL, ZK_SESSION_TIMEOUT_MILLIS, ZK_CONNECTION_TIMEOUT_MILLIS, diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 57a8bdf..b403cc4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -437,7 +437,7 @@ private[spark] class SparkSubmit extends Logging { } if (localPyFiles != null) { - sparkConf.set("spark.submit.pyFiles", localPyFiles) + sparkConf.set(SUBMIT_PYTHON_FILES, localPyFiles.split(",").toSeq) } // In YARN mode for an R app, add the SparkR package archive and the R package @@ -614,11 +614,11 @@ private[spark] class SparkSubmit extends Logging { // For YARN cluster mode, the jar is already distributed on each node as "app.jar" // For python and R files, the primary resource is already distributed as a regular file if (!isYarnCluster && !args.isPython && !args.isR) { - var jars = sparkConf.getOption("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty) + var jars = sparkConf.get(JARS) if (isUserJar(args.primaryResource)) { jars = jars ++ Seq(args.primaryResource) } - sparkConf.set("spark.jars", jars.mkString(",")) + sparkConf.set(JARS, jars) } // In standalone cluster mode, use the REST client to submit the application (Spark 1.3+). @@ -681,7 +681,7 @@ private[spark] class SparkSubmit extends Logging { // Second argument is main class childArgs += (args.primaryResource, "") if (args.pyFiles != null) { - sparkConf.set("spark.submit.pyFiles", args.pyFiles) + sparkConf.set(SUBMIT_PYTHON_FILES, args.pyFiles.split(",").toSeq) } } else if (args.isR) { // Second argument is main class @@ -748,18 +748,17 @@ private[spark] class SparkSubmit extends Logging { // Resolve and format python file paths properly before adding them to the PYTHONPATH. // The resolving part is redundant in the case of --py-files, but necessary if the user // explicitly sets `spark.submit.pyFiles` in his/her default properties file. - sparkConf.getOption("spark.submit.pyFiles").foreach { pyFiles => - val resolvedPyFiles = Utils.resolveURIs(pyFiles) - val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) { - PythonRunner.formatPaths(resolvedPyFiles).mkString(",") - } else { - // Ignoring formatting python path in yarn and mesos cluster mode, these two modes - // support dealing with remote python files, they could distribute and add python files - // locally. - resolvedPyFiles - } - sparkConf.set("spark.submit.pyFiles", formattedPyFiles) + val pyFiles = sparkConf.get(SUBMIT_PYTHON_FILES) + val resolvedPyFiles = Utils.resolveURIs(pyFiles.mkString(",")) + val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) { + PythonRunner.formatPaths(resolvedPyFiles).mkString(",") + } else { + // Ignoring formatting python path in yarn and mesos cluster mode, these two modes + // support dealing with remote python files, they could distribute and add python files + // locally. + resolvedPyFiles } + sparkConf.set(SUBMIT_PYTHON_FILES, formattedPyFiles.split(",").toSeq) (childArgs, childClasspath, sparkConf, childMainClass) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 32f6d1f..b26da8a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -34,7 +34,9 @@ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.deploy.rest.StandaloneRestServer import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Deploy._ import org.apache.spark.internal.config.UI._ +import org.apache.spark.internal.config.Worker._ import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc._ import org.apache.spark.serializer.{JavaSerializer, Serializer} @@ -56,12 +58,12 @@ private[deploy] class Master( // For application IDs private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) - private val WORKER_TIMEOUT_MS = conf.getLong("spark.worker.timeout", 60) * 1000 - private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200) - private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200) - private val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15) - private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE") - private val MAX_EXECUTOR_RETRIES = conf.getInt("spark.deploy.maxExecutorRetries", 10) + private val workerTimeoutMs = conf.get(WORKER_TIMEOUT) * 1000 + private val retainedApplications = conf.get(RETAINED_APPLICATIONS) + private val retainedDrivers = conf.get(RETAINED_DRIVERS) + private val reaperIterations = conf.get(REAPER_ITERATIONS) + private val recoveryMode = conf.get(RECOVERY_MODE) + private val maxExecutorRetries = conf.get(MAX_EXECUTOR_RETRIES) val workers = new HashSet[WorkerInfo] val idToApp = new HashMap[String, ApplicationInfo] @@ -113,13 +115,13 @@ private[deploy] class Master( // As a temporary workaround before better ways of configuring memory, we allow users to set // a flag that will perform round-robin scheduling across the nodes (spreading out each app // among all the nodes) instead of trying to consolidate each app onto a small # of nodes. - private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true) + private val spreadOutApps = conf.get(SPREAD_OUT_APPS) // Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue) - private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue) + private val defaultCores = conf.get(DEFAULT_CORES) val reverseProxy = conf.get(UI_REVERSE_PROXY) if (defaultCores < 1) { - throw new SparkException("spark.deploy.defaultCores must be positive") + throw new SparkException(s"${DEFAULT_CORES.key} must be positive") } // Alternative application submission gateway that is stable across Spark versions @@ -151,7 +153,7 @@ private[deploy] class Master( override def run(): Unit = Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) } - }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) + }, 0, workerTimeoutMs, TimeUnit.MILLISECONDS) if (restServerEnabled) { val port = conf.get(MASTER_REST_SERVER_PORT) @@ -168,7 +170,7 @@ private[deploy] class Master( applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) val serializer = new JavaSerializer(conf) - val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { + val (persistenceEngine_, leaderElectionAgent_) = recoveryMode match { case "ZOOKEEPER" => logInfo("Persisting recovery state to ZooKeeper") val zkFactory = @@ -179,7 +181,7 @@ private[deploy] class Master( new FileSystemRecoveryModeFactory(conf, serializer) (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) case "CUSTOM" => - val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory")) + val clazz = Utils.classForName(conf.get(RECOVERY_MODE_FACTORY)) val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer]) .newInstance(conf, serializer) .asInstanceOf[StandaloneRecoveryModeFactory] @@ -233,7 +235,7 @@ private[deploy] class Master( override def run(): Unit = Utils.tryLogNonFatalError { self.send(CompleteRecovery) } - }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) + }, workerTimeoutMs, TimeUnit.MILLISECONDS) } case CompleteRecovery => completeRecovery() @@ -311,8 +313,8 @@ private[deploy] class Master( // Important note: this code path is not exercised by tests, so be very careful when // changing this `if` condition. if (!normalExit - && appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES - && MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path + && appInfo.incrementRetryCount() >= maxExecutorRetries + && maxExecutorRetries >= 0) { // < 0 disables this application-killing path val execs = appInfo.executors.values if (!execs.exists(_.state == ExecutorState.RUNNING)) { logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + @@ -870,8 +872,8 @@ private[deploy] class Master( endpointToApp -= app.driver addressToApp -= app.driver.address - if (completedApps.size >= RETAINED_APPLICATIONS) { - val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1) + if (completedApps.size >= retainedApplications) { + val toRemove = math.max(retainedApplications / 10, 1) completedApps.take(toRemove).foreach { a => applicationMetricsSystem.removeSource(a.appSource) } @@ -989,14 +991,14 @@ private[deploy] class Master( private def timeOutDeadWorkers() { // Copy the workers into an array so we don't modify the hashset while iterating through it val currentTime = System.currentTimeMillis() - val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray + val toRemove = workers.filter(_.lastHeartbeat < currentTime - workerTimeoutMs).toArray for (worker <- toRemove) { if (worker.state != WorkerState.DEAD) { logWarning("Removing %s because we got no heartbeat in %d seconds".format( - worker.id, WORKER_TIMEOUT_MS / 1000)) - removeWorker(worker, s"Not receiving heartbeat for ${WORKER_TIMEOUT_MS / 1000} seconds") + worker.id, workerTimeoutMs / 1000)) + removeWorker(worker, s"Not receiving heartbeat for ${workerTimeoutMs / 1000} seconds") } else { - if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) { + if (worker.lastHeartbeat < currentTime - ((reaperIterations + 1) * workerTimeoutMs)) { workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it } } @@ -1031,8 +1033,8 @@ private[deploy] class Master( case Some(driver) => logInfo(s"Removing driver: $driverId") drivers -= driver - if (completedDrivers.size >= RETAINED_DRIVERS) { - val toRemove = math.max(RETAINED_DRIVERS / 10, 1) + if (completedDrivers.size >= retainedDrivers) { + val toRemove = math.max(retainedDrivers / 10, 1) completedDrivers.trimStart(toRemove) } completedDrivers += driver diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala index ffdd635..4707987 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Deploy.RECOVERY_DIRECTORY import org.apache.spark.serializer.Serializer /** @@ -52,11 +53,11 @@ abstract class StandaloneRecoveryModeFactory(conf: SparkConf, serializer: Serial private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: Serializer) extends StandaloneRecoveryModeFactory(conf, serializer) with Logging { - val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "") + val recoveryDir = conf.get(RECOVERY_DIRECTORY) def createPersistenceEngine(): PersistenceEngine = { - logInfo("Persisting recovery state to directory: " + RECOVERY_DIR) - new FileSystemPersistenceEngine(RECOVERY_DIR, serializer) + logInfo("Persisting recovery state to directory: " + recoveryDir) + new FileSystemPersistenceEngine(recoveryDir, serializer) } def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala index 1e8dabf..47f3091 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala @@ -23,11 +23,12 @@ import org.apache.curator.framework.recipes.leader.{LeaderLatch, LeaderLatchList import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkCuratorUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Deploy.ZOOKEEPER_DIRECTORY private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable, conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging { - val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" + val workingDir = conf.get(ZOOKEEPER_DIRECTORY).getOrElse("/spark") + "/leader_election" private var zk: CuratorFramework = _ private var leaderLatch: LeaderLatch = _ @@ -38,7 +39,7 @@ private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderEle private def start() { logInfo("Starting ZooKeeper LeaderElection agent") zk = SparkCuratorUtil.newClient(conf) - leaderLatch = new LeaderLatch(zk, WORKING_DIR) + leaderLatch = new LeaderLatch(zk, workingDir) leaderLatch.addListener(this) leaderLatch.start() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala index af850e4..73dd0de 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala @@ -28,6 +28,7 @@ import org.apache.zookeeper.CreateMode import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkCuratorUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Deploy._ import org.apache.spark.serializer.Serializer @@ -35,22 +36,22 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer extends PersistenceEngine with Logging { - private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status" + private val workingDir = conf.get(ZOOKEEPER_DIRECTORY).getOrElse("/spark") + "/master_status" private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf) - SparkCuratorUtil.mkdir(zk, WORKING_DIR) + SparkCuratorUtil.mkdir(zk, workingDir) override def persist(name: String, obj: Object): Unit = { - serializeIntoFile(WORKING_DIR + "/" + name, obj) + serializeIntoFile(workingDir + "/" + name, obj) } override def unpersist(name: String): Unit = { - zk.delete().forPath(WORKING_DIR + "/" + name) + zk.delete().forPath(workingDir + "/" + name) } override def read[T: ClassTag](prefix: String): Seq[T] = { - zk.getChildren.forPath(WORKING_DIR).asScala + zk.getChildren.forPath(workingDir).asScala .filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T]) } @@ -66,13 +67,13 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer } private def deserializeFromFile[T](filename: String)(implicit m: ClassTag[T]): Option[T] = { - val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename) + val fileData = zk.getData().forPath(workingDir + "/" + filename) try { Some(serializer.newInstance().deserialize[T](ByteBuffer.wrap(fileData))) } catch { case e: Exception => logWarning("Exception while reading persisted file, deleting", e) - zk.delete().forPath(WORKING_DIR + "/" + filename) + zk.delete().forPath(workingDir + "/" + filename) None } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index a6d13d1..8c2a907 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -31,6 +31,7 @@ import org.apache.spark.deploy.DeployMessages.DriverStateChanged import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Worker.WORKER_DRIVER_TERMINATE_TIMEOUT import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils} @@ -57,8 +58,7 @@ private[deploy] class DriverRunner( @volatile private[worker] var finalException: Option[Exception] = None // Timeout to wait for when trying to terminate a driver. - private val DRIVER_TERMINATE_TIMEOUT_MS = - conf.getTimeAsMs("spark.worker.driverTerminateTimeout", "10s") + private val driverTerminateTimeoutMs = conf.get(WORKER_DRIVER_TERMINATE_TIMEOUT) // Decoupled for testing def setClock(_clock: Clock): Unit = { @@ -122,7 +122,7 @@ private[deploy] class DriverRunner( killed = true synchronized { process.foreach { p => - val exitCode = Utils.terminateProcess(p, DRIVER_TERMINATE_TIMEOUT_MS) + val exitCode = Utils.terminateProcess(p, driverTerminateTimeoutMs) if (exitCode.isEmpty) { logWarning("Failed to terminate driver process: " + p + ". This process will likely be orphaned.") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 8c3593c..115450b 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -39,6 +39,7 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.internal.config.UI._ +import org.apache.spark.internal.config.Worker._ import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc._ import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils} @@ -74,7 +75,7 @@ private[deploy] class Worker( // For worker and executor IDs private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) // Send a heartbeat every (heartbeat timeout) / 4 milliseconds - private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4 + private val HEARTBEAT_MILLIS = conf.get(WORKER_TIMEOUT) * 1000 / 4 // Model retries to connect to the master, after Hadoop's model. // The first six attempts to reconnect are in shorter intervals (between 5 and 15 seconds) @@ -93,13 +94,11 @@ private[deploy] class Worker( private val PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(60 * REGISTRATION_RETRY_FUZZ_MULTIPLIER)) - private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false) + private val CLEANUP_ENABLED = conf.get(WORKER_CLEANUP_ENABLED) // How often worker will clean up old app folders - private val CLEANUP_INTERVAL_MILLIS = - conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000 + private val CLEANUP_INTERVAL_MILLIS = conf.get(WORKER_CLEANUP_INTERVAL) * 1000 // TTL for app folders/data; after TTL expires it will be cleaned up - private val APP_DATA_RETENTION_SECONDS = - conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600) + private val APP_DATA_RETENTION_SECONDS = conf.get(APP_DATA_RETENTION) // Whether or not cleanup the non-shuffle files on executor exits. private val CLEANUP_NON_SHUFFLE_FILES_ENABLED = @@ -111,8 +110,7 @@ private[deploy] class Worker( * Whether to use the master address in `masterRpcAddresses` if possible. If it's disabled, Worker * will just use the address received from Master. */ - private val preferConfiguredMasterAddress = - conf.getBoolean("spark.worker.preferConfiguredMasterAddress", false) + private val preferConfiguredMasterAddress = conf.get(PREFER_CONFIGURED_MASTER_ADDRESS) /** * The master address to connect in case of failure. When the connection is broken, worker will * use this address to connect. This is usually just one of `masterRpcAddresses`. However, when @@ -143,10 +141,8 @@ private[deploy] class Worker( val appDirectories = new HashMap[String, Seq[String]] val finishedApps = new HashSet[String] - val retainedExecutors = conf.getInt("spark.worker.ui.retainedExecutors", - WorkerWebUI.DEFAULT_RETAINED_EXECUTORS) - val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers", - WorkerWebUI.DEFAULT_RETAINED_DRIVERS) + val retainedExecutors = conf.get(WORKER_UI_RETAINED_EXECUTORS) + val retainedDrivers = conf.get(WORKER_UI_RETAINED_DRIVERS) // The shuffle service is not actually started unless configured. private val shuffleService = if (externalShuffleServiceSupplier != null) { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala index 5802812..8c87708 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala @@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory import scala.annotation.tailrec import org.apache.spark.SparkConf +import org.apache.spark.internal.config.Worker._ import org.apache.spark.util.{IntParam, MemoryParam, Utils} /** @@ -59,9 +60,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) { // This mutates the SparkConf, so all accesses to it must be made after this line propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile) - if (conf.contains("spark.worker.ui.port")) { - webUiPort = conf.get("spark.worker.ui.port").toInt - } + conf.get(WORKER_UI_PORT).foreach { webUiPort = _ } checkWorkerMemory() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 5488695..96980c3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -56,6 +56,4 @@ class WorkerWebUI( private[worker] object WorkerWebUI { val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR - val DEFAULT_RETAINED_DRIVERS = 1000 - val DEFAULT_RETAINED_EXECUTORS = 1000 } diff --git a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala new file mode 100644 index 0000000..ceab957 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.config + +private[spark] object Deploy { + val RECOVERY_MODE = ConfigBuilder("spark.deploy.recoveryMode") + .stringConf + .createWithDefault("NONE") + + val RECOVERY_MODE_FACTORY = ConfigBuilder("spark.deploy.recoveryMode.factory") + .stringConf + .createWithDefault("") + + val RECOVERY_DIRECTORY = ConfigBuilder("spark.deploy.recoveryDirectory") + .stringConf + .createWithDefault("") + + val ZOOKEEPER_URL = ConfigBuilder("spark.deploy.zookeeper.url") + .doc(s"When `${RECOVERY_MODE.key}` is set to ZOOKEEPER, this " + + "configuration is used to set the zookeeper URL to connect to.") + .stringConf + .createOptional + + val ZOOKEEPER_DIRECTORY = ConfigBuilder("spark.deploy.zookeeper.dir") + .stringConf + .createOptional + + val RETAINED_APPLICATIONS = ConfigBuilder("spark.deploy.retainedApplications") + .intConf + .createWithDefault(200) + + val RETAINED_DRIVERS = ConfigBuilder("spark.deploy.retainedDrivers") + .intConf + .createWithDefault(200) + + val REAPER_ITERATIONS = ConfigBuilder("spark.dead.worker.persistence") + .intConf + .createWithDefault(15) + + val MAX_EXECUTOR_RETRIES = ConfigBuilder("spark.deploy.maxExecutorRetries") + .intConf + .createWithDefault(10) + + val SPREAD_OUT_APPS = ConfigBuilder("spark.deploy.spreadOut") + .booleanConf + .createWithDefault(true) + + val DEFAULT_CORES = ConfigBuilder("spark.deploy.defaultCores") + .intConf + .createWithDefault(Int.MaxValue) + + +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala b/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala new file mode 100644 index 0000000..7873141 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.config + +import org.apache.spark.network.util.ByteUnit + +private[spark] object Kryo { + + val KRYO_REGISTRATION_REQUIRED = ConfigBuilder("spark.kryo.registrationRequired") + .booleanConf + .createWithDefault(false) + + val KRYO_USER_REGISTRATORS = ConfigBuilder("spark.kryo.registrator") + .stringConf + .createOptional + + val KRYO_CLASSES_TO_REGISTER = ConfigBuilder("spark.kryo.classesToRegister") + .stringConf + .toSequence + .createWithDefault(Nil) + + val KRYO_USE_UNSAFE = ConfigBuilder("spark.kyro.unsafe") + .booleanConf + .createWithDefault(false) + + val KRYO_USE_POOL = ConfigBuilder("spark.kyro.pool") + .booleanConf + .createWithDefault(true) + + val KRYO_REFERENCE_TRACKING = ConfigBuilder("spark.kryo.referenceTracking") + .booleanConf + .createWithDefault(true) + + val KRYO_SERIALIZER_BUFFER_SIZE = ConfigBuilder("spark.kryoserializer.buffer") + .bytesConf(ByteUnit.KiB) + .createWithDefaultString("64k") + + val KRYO_SERIALIZER_MAX_BUFFER_SIZE = ConfigBuilder("spark.kryoserializer.buffer.max") + .bytesConf(ByteUnit.MiB) + .createWithDefaultString("64m") + +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala new file mode 100644 index 0000000..47f7167 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.config + +import java.util.concurrent.TimeUnit + +private[spark] object Worker { + val WORKER_TIMEOUT = ConfigBuilder("spark.worker.timeout") + .longConf + .createWithDefault(60) + + val WORKER_DRIVER_TERMINATE_TIMEOUT = ConfigBuilder("spark.worker.driverTerminateTimeout") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("10s") + + val WORKER_CLEANUP_ENABLED = ConfigBuilder("spark.worker.cleanup.enabled") + .booleanConf + .createWithDefault(false) + + val WORKER_CLEANUP_INTERVAL = ConfigBuilder("spark.worker.cleanup.interval") + .longConf + .createWithDefault(60 * 30) + + val APP_DATA_RETENTION = ConfigBuilder("spark.worker.cleanup.appDataTtl") + .longConf + .createWithDefault(7 * 24 * 3600) + + val PREFER_CONFIGURED_MASTER_ADDRESS = ConfigBuilder("spark.worker.preferConfiguredMasterAddress") + .booleanConf + .createWithDefault(false) + + val WORKER_UI_PORT = ConfigBuilder("spark.worker.ui.port") + .intConf + .createOptional + + val WORKER_UI_RETAINED_EXECUTORS = ConfigBuilder("spark.worker.ui.retainedExecutors") + .intConf + .createWithDefault(1000) + + val WORKER_UI_RETAINED_DRIVERS = ConfigBuilder("spark.worker.ui.retainedDrivers") + .intConf + .createWithDefault(1000) + + val UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF = + ConfigBuilder("spark.worker.ui.compressedLogFileLengthCacheSize") + .intConf + .createWithDefault(100) +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 0e78637..99ce220 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -961,4 +961,35 @@ package object config { .intConf .createWithDefault(4) + private[spark] val SERIALIZER = ConfigBuilder("spark.serializer") + .stringConf + .createWithDefault("org.apache.spark.serializer.JavaSerializer") + + private[spark] val SERIALIZER_OBJECT_STREAM_RESET = + ConfigBuilder("spark.serializer.objectStreamReset") + .intConf + .createWithDefault(100) + + private[spark] val SERIALIZER_EXTRA_DEBUG_INFO = ConfigBuilder("spark.serializer.extraDebugInfo") + .booleanConf + .createWithDefault(true) + + private[spark] val JARS = ConfigBuilder("spark.jars") + .stringConf + .toSequence + .createWithDefault(Nil) + + private[spark] val FILES = ConfigBuilder("spark.files") + .stringConf + .toSequence + .createWithDefault(Nil) + + private[spark] val SUBMIT_DEPLOY_MODE = ConfigBuilder("spark.submit.deployMode") + .stringConf + .createWithDefault("client") + + private[spark] val SUBMIT_PYTHON_FILES = ConfigBuilder("spark.submit.pyFiles") + .stringConf + .toSequence + .createWithDefault(Nil) } diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index f60dcfd..70564ee 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -24,6 +24,7 @@ import scala.reflect.ClassTag import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.internal.config._ import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils} private[spark] class JavaSerializationStream( @@ -137,8 +138,8 @@ private[spark] class JavaSerializerInstance( */ @DeveloperApi class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable { - private var counterReset = conf.getInt("spark.serializer.objectStreamReset", 100) - private var extraDebugInfo = conf.getBoolean("spark.serializer.extraDebugInfo", true) + private var counterReset = conf.get(SERIALIZER_OBJECT_STREAM_RESET) + private var extraDebugInfo = conf.get(SERIALIZER_EXTRA_DEBUG_INFO) protected def this() = this(new SparkConf()) // For deserialization only diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 72ca0fb..2df133d 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -39,6 +39,7 @@ import org.roaringbitmap.RoaringBitmap import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} import org.apache.spark.storage._ @@ -58,34 +59,34 @@ class KryoSerializer(conf: SparkConf) with Logging with Serializable { - private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k") + private val bufferSizeKb = conf.get(KRYO_SERIALIZER_BUFFER_SIZE) if (bufferSizeKb >= ByteUnit.GiB.toKiB(2)) { - throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than " + + throw new IllegalArgumentException(s"${KRYO_SERIALIZER_BUFFER_SIZE.key} must be less than " + s"2048 MiB, got: + ${ByteUnit.KiB.toMiB(bufferSizeKb)} MiB.") } private val bufferSize = ByteUnit.KiB.toBytes(bufferSizeKb).toInt - val maxBufferSizeMb = conf.getSizeAsMb("spark.kryoserializer.buffer.max", "64m").toInt + val maxBufferSizeMb = conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE).toInt if (maxBufferSizeMb >= ByteUnit.GiB.toMiB(2)) { - throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than " + - s"2048 MiB, got: + $maxBufferSizeMb MiB.") + throw new IllegalArgumentException(s"${KRYO_SERIALIZER_MAX_BUFFER_SIZE.key} must be less " + + s"than 2048 MiB, got: $maxBufferSizeMb MiB.") } private val maxBufferSize = ByteUnit.MiB.toBytes(maxBufferSizeMb).toInt - private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) - private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false) - private val userRegistrators = conf.get("spark.kryo.registrator", "") - .split(',').map(_.trim) + private val referenceTracking = conf.get(KRYO_REFERENCE_TRACKING) + private val registrationRequired = conf.get(KRYO_REGISTRATION_REQUIRED) + private val userRegistrators = conf.get(KRYO_USER_REGISTRATORS) + .map(_.trim) .filter(!_.isEmpty) - private val classesToRegister = conf.get("spark.kryo.classesToRegister", "") - .split(',').map(_.trim) + private val classesToRegister = conf.get(KRYO_CLASSES_TO_REGISTER) + .map(_.trim) .filter(!_.isEmpty) private val avroSchemas = conf.getAvroSchema // whether to use unsafe based IO for serialization - private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false) - private val usePool = conf.getBoolean("spark.kryo.pool", true) + private val useUnsafe = conf.get(KRYO_USE_UNSAFE) + private val usePool = conf.get(KRYO_USE_POOL) def newKryoOutput(): KryoOutput = if (useUnsafe) { @@ -407,7 +408,7 @@ private[spark] class KryoSerializerInstance( } catch { case e: KryoException if e.getMessage.startsWith("Buffer overflow") => throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " + - "increase spark.kryoserializer.buffer.max value.", e) + s"increase ${KRYO_SERIALIZER_MAX_BUFFER_SIZE.key} value.", e) } finally { releaseKryo(kryo) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 83d1b2b..7416559 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -62,6 +62,7 @@ import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.internal.config.UI._ +import org.apache.spark.internal.config.Worker._ import org.apache.spark.launcher.SparkLauncher import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} @@ -1457,16 +1458,12 @@ private[spark] object Utils extends Logging { CallSite(shortForm, longForm) } - private val UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF = - "spark.worker.ui.compressedLogFileLengthCacheSize" - private val DEFAULT_UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE = 100 private var compressedLogFileLengthCache: LoadingCache[String, java.lang.Long] = null private def getCompressedLogFileLengthCache( sparkConf: SparkConf): LoadingCache[String, java.lang.Long] = this.synchronized { if (compressedLogFileLengthCache == null) { - val compressedLogFileLengthCacheSize = sparkConf.getInt( - UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF, - DEFAULT_UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE) + val compressedLogFileLengthCacheSize = sparkConf.get( + UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF) compressedLogFileLengthCache = CacheBuilder.newBuilder() .maximumSize(compressedLogFileLengthCacheSize) .build[String, java.lang.Long](new CacheLoader[String, java.lang.Long]() { @@ -2535,8 +2532,7 @@ private[spark] object Utils extends Logging { * has its own mechanism to distribute jars. */ def getUserJars(conf: SparkConf): Seq[String] = { - val sparkJars = conf.getOption("spark.jars") - sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten + conf.get(JARS).filter(_.nonEmpty) } /** diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index 61da413..f8adaf5 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -27,6 +27,7 @@ import scala.concurrent.duration._ import org.scalatest.BeforeAndAfter import org.scalatest.Matchers +import org.apache.spark.internal.config.Deploy._ import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted, SparkListenerTaskEnd, SparkListenerTaskStart} import org.apache.spark.util.ThreadUtils @@ -256,7 +257,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft .set("spark.task.reaper.enabled", "true") .set("spark.task.reaper.killTimeout", "-1") .set("spark.task.reaper.PollingInterval", "1s") - .set("spark.deploy.maxExecutorRetries", "1") + .set(MAX_EXECUTOR_RETRIES, 1) sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) // Add a listener to release the semaphore once any tasks are launched. diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index ffa7042..3203f8f 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -23,6 +23,7 @@ import java.util.concurrent.{Callable, CyclicBarrier, Executors, ExecutorService import org.scalatest.Matchers import org.apache.spark.ShuffleSuite.NonJavaSerializableClass +import org.apache.spark.internal.config.SERIALIZER import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD} @@ -215,7 +216,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC test("sort with Java non serializable class - Kryo") { // Use a local cluster with 2 processes to make sure there are both local and remote blocks - val myConf = conf.clone().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + val myConf = conf.clone().set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") sc = new SparkContext("local-cluster[2,1,1024]", "test", myConf) val a = sc.parallelize(1 to 10, 2) val b = a.map { x => diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index a8849ab..4071dd4 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -28,6 +28,7 @@ import com.esotericsoftware.kryo.Kryo import org.apache.spark.internal.config._ import org.apache.spark.internal.config.History._ +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer} import org.apache.spark.util.{ResetSystemProperties, RpcUtils} @@ -78,7 +79,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst assert(conf.get("spark.master") === "local[3]") assert(conf.get("spark.app.name") === "My app") assert(conf.get("spark.home") === "/path") - assert(conf.get("spark.jars") === "a.jar,b.jar") + assert(conf.get(JARS) === Seq("a.jar", "b.jar")) assert(conf.get("spark.executorEnv.VAR1") === "value1") assert(conf.get("spark.executorEnv.VAR2") === "value2") assert(conf.get("spark.executorEnv.VAR3") === "value3") @@ -86,7 +87,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst // Test the Java-friendly versions of these too conf.setJars(Array("c.jar", "d.jar")) conf.setExecutorEnv(Array(("VAR4", "value4"), ("VAR5", "value5"))) - assert(conf.get("spark.jars") === "c.jar,d.jar") + assert(conf.get(JARS) === Seq("c.jar", "d.jar")) assert(conf.get("spark.executorEnv.VAR4") === "value4") assert(conf.get("spark.executorEnv.VAR5") === "value5") } @@ -182,19 +183,19 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst } test("register kryo classes through registerKryoClasses") { - val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") + val conf = new SparkConf().set(KRYO_REGISTRATION_REQUIRED, true) conf.registerKryoClasses(Array(classOf[Class1], classOf[Class2])) - assert(conf.get("spark.kryo.classesToRegister") === - classOf[Class1].getName + "," + classOf[Class2].getName) + assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet === + Seq(classOf[Class1].getName, classOf[Class2].getName).toSet) conf.registerKryoClasses(Array(classOf[Class3])) - assert(conf.get("spark.kryo.classesToRegister") === - classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName) + assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet === + Seq(classOf[Class1].getName, classOf[Class2].getName, classOf[Class3].getName).toSet) conf.registerKryoClasses(Array(classOf[Class2])) - assert(conf.get("spark.kryo.classesToRegister") === - classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName) + assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet === + Seq(classOf[Class1].getName, classOf[Class2].getName, classOf[Class3].getName).toSet) // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't // blow up. @@ -205,12 +206,12 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst } test("register kryo classes through registerKryoClasses and custom registrator") { - val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") + val conf = new SparkConf().set(KRYO_REGISTRATION_REQUIRED, true) conf.registerKryoClasses(Array(classOf[Class1])) - assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName) + assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet === Seq(classOf[Class1].getName).toSet) - conf.set("spark.kryo.registrator", classOf[CustomRegistrator].getName) + conf.set(KRYO_USER_REGISTRATORS, classOf[CustomRegistrator].getName) // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't // blow up. @@ -220,9 +221,9 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst } test("register kryo classes through conf") { - val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") - conf.set("spark.kryo.classesToRegister", "java.lang.StringBuffer") - conf.set("spark.serializer", classOf[KryoSerializer].getName) + val conf = new SparkConf().set(KRYO_REGISTRATION_REQUIRED, true) + conf.set(KRYO_CLASSES_TO_REGISTER, Seq("java.lang.StringBuffer")) + conf.set(SERIALIZER, classOf[KryoSerializer].getName) // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't // blow up. diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala index 7407a65..24004de 100644 --- a/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala @@ -24,6 +24,7 @@ import scala.io.Source import org.scalatest.Matchers import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.Utils @@ -48,7 +49,7 @@ class PythonBroadcastSuite extends SparkFunSuite with Matchers with SharedSparkC } val broadcast = new PythonBroadcast(broadcastDataFile.getAbsolutePath) assertBroadcastIsValid(broadcast) - val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") + val conf = new SparkConf().set(KRYO_REGISTRATION_REQUIRED, true) val deserializedBroadcast = Utils.clone[PythonBroadcast](broadcast, new KryoSerializer(conf).newInstance()) assertBroadcastIsValid(deserializedBroadcast) diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index 6d74812..18ec60d 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.Assertions import org.apache.spark._ import org.apache.spark.internal.config +import org.apache.spark.internal.config.SERIALIZER import org.apache.spark.io.SnappyCompressionCodec import org.apache.spark.rdd.RDD import org.apache.spark.security.EncryptionFunSuite @@ -68,7 +69,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio encryptionTest("Accessing TorrentBroadcast variables in a local cluster") { conf => val numSlaves = 4 - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") conf.set(config.BROADCAST_COMPRESS, true) sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", conf) val list = List[Int](1, 2, 3, 4) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index c6e961e..30efbb0 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -221,7 +221,7 @@ class SparkSubmitSuite val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs) appArgs.deployMode should be ("client") - conf.get("spark.submit.deployMode") should be ("client") + conf.get(SUBMIT_DEPLOY_MODE) should be ("client") // Both cmd line and configuration are specified, cmdline option takes the priority val clArgs1 = Seq( @@ -235,7 +235,7 @@ class SparkSubmitSuite val (_, _, conf1, _) = submit.prepareSubmitEnvironment(appArgs1) appArgs1.deployMode should be ("cluster") - conf1.get("spark.submit.deployMode") should be ("cluster") + conf1.get(SUBMIT_DEPLOY_MODE) should be ("cluster") // Neither cmdline nor configuration are specified, client mode is the default choice val clArgs2 = Seq( @@ -248,7 +248,7 @@ class SparkSubmitSuite val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2) appArgs2.deployMode should be ("client") - conf2.get("spark.submit.deployMode") should be ("client") + conf2.get(SUBMIT_DEPLOY_MODE) should be ("client") } test("handles YARN cluster mode") { @@ -374,12 +374,12 @@ class SparkSubmitSuite val confMap = conf.getAll.toMap confMap.keys should contain ("spark.master") confMap.keys should contain ("spark.app.name") - confMap.keys should contain ("spark.jars") + confMap.keys should contain (JARS.key) confMap.keys should contain ("spark.driver.memory") confMap.keys should contain ("spark.driver.cores") confMap.keys should contain ("spark.driver.supervise") confMap.keys should contain (UI_ENABLED.key) - confMap.keys should contain ("spark.submit.deployMode") + confMap.keys should contain (SUBMIT_DEPLOY_MODE.key) conf.get(UI_ENABLED) should be (false) } @@ -467,7 +467,7 @@ class SparkSubmitSuite val (_, _, conf, mainClass) = submit.prepareSubmitEnvironment(appArgs) conf.get("spark.executor.memory") should be ("5g") conf.get("spark.master") should be ("yarn") - conf.get("spark.submit.deployMode") should be ("cluster") + conf.get(SUBMIT_DEPLOY_MODE) should be ("cluster") mainClass should be (SparkSubmit.YARN_CLUSTER_SUBMIT_CLASS) } @@ -662,7 +662,7 @@ class SparkSubmitSuite val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs) appArgs.jars should be(Utils.resolveURIs(jars)) appArgs.files should be(Utils.resolveURIs(files)) - conf.get("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar")) + conf.get(JARS) should be(Utils.resolveURIs(jars + ",thejar.jar").split(",").toSeq) conf.get("spark.files") should be(Utils.resolveURIs(files)) // Test files and archives (Yarn) @@ -692,8 +692,8 @@ class SparkSubmitSuite val appArgs3 = new SparkSubmitArguments(clArgs3) val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3) appArgs3.pyFiles should be(Utils.resolveURIs(pyFiles)) - conf3.get("spark.submit.pyFiles") should be( - PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(",")) + conf3.get(SUBMIT_PYTHON_FILES) should be( + PythonRunner.formatPaths(Utils.resolveURIs(pyFiles))) conf3.get(PYSPARK_DRIVER_PYTHON.key) should be("python3.4") conf3.get(PYSPARK_PYTHON.key) should be("python3.5") } @@ -744,8 +744,8 @@ class SparkSubmitSuite ) val appArgs = new SparkSubmitArguments(clArgs) val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs) - conf.get("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar")) - conf.get("spark.files") should be(Utils.resolveURIs(files)) + conf.get(JARS) should be(Utils.resolveURIs(jars + ",thejar.jar").split(",").toSeq) + conf.get(FILES) should be(Utils.resolveURIs(files).split(",").toSeq) // Test files and archives (Yarn) val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir) @@ -776,8 +776,8 @@ class SparkSubmitSuite ) val appArgs3 = new SparkSubmitArguments(clArgs3) val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3) - conf3.get("spark.submit.pyFiles") should be( - PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(",")) + conf3.get(SUBMIT_PYTHON_FILES) should be( + PythonRunner.formatPaths(Utils.resolveURIs(pyFiles))) // Test remote python files val hadoopConf = new Configuration() @@ -798,7 +798,7 @@ class SparkSubmitSuite val appArgs4 = new SparkSubmitArguments(clArgs4) val (_, _, conf4, _) = submit.prepareSubmitEnvironment(appArgs4, conf = Some(hadoopConf)) // Should not format python path for yarn cluster mode - conf4.get("spark.submit.pyFiles") should be(Utils.resolveURIs(remotePyFiles)) + conf4.get(SUBMIT_PYTHON_FILES) should be(Utils.resolveURIs(remotePyFiles).split(",")) } } @@ -1024,7 +1024,7 @@ class SparkSubmitSuite conf.get("spark.repl.local.jars") should (startWith("file:")) // local py files should not be a URI format. - conf.get("spark.submit.pyFiles") should (startWith("/")) + conf.get(SUBMIT_PYTHON_FILES).foreach { _ should (startWith("/")) } } } @@ -1155,7 +1155,7 @@ class SparkSubmitSuite val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, conf = Some(hadoopConf)) conf.get(PY_FILES.key) should be(s"s3a://${pyFile.getAbsolutePath}") - conf.get("spark.submit.pyFiles") should (startWith("/")) + conf.get(SUBMIT_PYTHON_FILES).foreach { _ should (startWith("/")) } // Verify "spark.submit.pyFiles" val args1 = Seq( @@ -1171,7 +1171,7 @@ class SparkSubmitSuite val (_, _, conf1, _) = submit.prepareSubmitEnvironment(appArgs1, conf = Some(hadoopConf)) conf1.get(PY_FILES.key) should be(s"s3a://${pyFile.getAbsolutePath}") - conf1.get("spark.submit.pyFiles") should (startWith("/")) + conf.get(SUBMIT_PYTHON_FILES).foreach { _ should (startWith("/")) } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 5904d03..fbf2acc 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -40,7 +40,9 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy._ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Deploy._ import org.apache.spark.internal.config.UI._ +import org.apache.spark.internal.config.Worker._ import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.serializer @@ -103,9 +105,8 @@ class MasterSuite extends SparkFunSuite test("can use a custom recovery mode factory") { val conf = new SparkConf(loadDefaults = false) - conf.set("spark.deploy.recoveryMode", "CUSTOM") - conf.set("spark.deploy.recoveryMode.factory", - classOf[CustomRecoveryModeFactory].getCanonicalName) + conf.set(RECOVERY_MODE, "CUSTOM") + conf.set(RECOVERY_MODE_FACTORY, classOf[CustomRecoveryModeFactory].getCanonicalName) conf.set(MASTER_REST_SERVER_ENABLED, false) val instantiationAttempts = CustomRecoveryModeFactory.instantiationAttempts @@ -188,9 +189,8 @@ class MasterSuite extends SparkFunSuite test("master correctly recover the application") { val conf = new SparkConf(loadDefaults = false) - conf.set("spark.deploy.recoveryMode", "CUSTOM") - conf.set("spark.deploy.recoveryMode.factory", - classOf[FakeRecoveryModeFactory].getCanonicalName) + conf.set(RECOVERY_MODE, "CUSTOM") + conf.set(RECOVERY_MODE_FACTORY, classOf[FakeRecoveryModeFactory].getCanonicalName) conf.set(MASTER_REST_SERVER_ENABLED, false) val fakeAppInfo = makeAppInfo(1024) @@ -637,7 +637,7 @@ class MasterSuite extends SparkFunSuite } test("SPARK-19900: there should be a corresponding driver for the app after relaunching driver") { - val conf = new SparkConf().set("spark.worker.timeout", "1") + val conf = new SparkConf().set(WORKER_TIMEOUT, 1L) val master = makeMaster(conf) master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) eventually(timeout(10.seconds)) { diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala index 3027865..3d8a46b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala @@ -24,6 +24,7 @@ import org.apache.commons.lang3.RandomUtils import org.apache.curator.test.TestingServer import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL import org.apache.spark.rpc.{RpcEndpoint, RpcEnv} import org.apache.spark.serializer.{JavaSerializer, Serializer} import org.apache.spark.util.Utils @@ -48,7 +49,7 @@ class PersistenceEngineSuite extends SparkFunSuite { val zkTestServer = new TestingServer(findFreePort(conf)) try { testPersistenceEngine(conf, serializer => { - conf.set("spark.deploy.zookeeper.url", zkTestServer.getConnectString) + conf.set(ZOOKEEPER_URL, zkTestServer.getConnectString) new ZooKeeperPersistenceEngine(conf, serializer) }) } finally { diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala index 75c50af..87655f3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala @@ -22,6 +22,7 @@ import java.lang.Boolean import org.json4s.jackson.JsonMethods._ import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ import org.apache.spark.util.Utils /** @@ -93,7 +94,7 @@ class SubmitRestProtocolSuite extends SparkFunSuite { message.sparkProperties = conf.getAll.toMap message.validate() // optional fields - conf.set("spark.jars", "mayonnaise.jar,ketchup.jar") + conf.set(JARS, Seq("mayonnaise.jar", "ketchup.jar")) conf.set("spark.files", "fireball.png") conf.set("spark.driver.memory", s"${Utils.DEFAULT_DRIVER_MEM_MB}m") conf.set("spark.driver.cores", "180") diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala index e5e5b5e..0ddf38c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService} import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged} import org.apache.spark.deploy.master.DriverState +import org.apache.spark.internal.config.Worker._ import org.apache.spark.rpc.{RpcAddress, RpcEnv} class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { @@ -100,7 +101,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { test("test clearing of finishedExecutors (small number of executors)") { val conf = new SparkConf() - conf.set("spark.worker.ui.retainedExecutors", 2.toString) + conf.set(WORKER_UI_RETAINED_EXECUTORS, 2) val worker = makeWorker(conf) // initialize workers for (i <- 0 until 5) { @@ -124,7 +125,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { test("test clearing of finishedExecutors (more executors)") { val conf = new SparkConf() - conf.set("spark.worker.ui.retainedExecutors", 30.toString) + conf.set(WORKER_UI_RETAINED_EXECUTORS, 30) val worker = makeWorker(conf) // initialize workers for (i <- 0 until 50) { @@ -157,7 +158,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { test("test clearing of finishedDrivers (small number of drivers)") { val conf = new SparkConf() - conf.set("spark.worker.ui.retainedDrivers", 2.toString) + conf.set(WORKER_UI_RETAINED_DRIVERS, 2) val worker = makeWorker(conf) // initialize workers for (i <- 0 until 5) { @@ -181,7 +182,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { test("test clearing of finishedDrivers (more drivers)") { val conf = new SparkConf() - conf.set("spark.worker.ui.retainedDrivers", 30.toString) + conf.set(WORKER_UI_RETAINED_DRIVERS, 30) val worker = makeWorker(conf) // initialize workers for (i <- 0 until 50) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index f41ffb7..c1e7fb9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -181,7 +181,7 @@ class MapStatusSuite extends SparkFunSuite { test("SPARK-21133 HighlyCompressedMapStatus#writeExternal throws NPE") { val conf = new SparkConf() - .set("spark.serializer", classOf[KryoSerializer].getName) + .set(config.SERIALIZER, classOf[KryoSerializer].getName) .setMaster("local") .setAppName("SPARK-21133") withSpark(new SparkContext(conf)) { sc => diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala index 3734f1c..8610b18 100644 --- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala @@ -25,9 +25,10 @@ import org.apache.avro.{Schema, SchemaBuilder} import org.apache.avro.generic.GenericData.Record import org.apache.spark.{SharedSparkContext, SparkFunSuite} +import org.apache.spark.internal.config.SERIALIZER class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") val schema : Schema = SchemaBuilder .record("testRecord").fields() diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala index d7730f2..fd228cd 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala @@ -22,6 +22,8 @@ import scala.util.Random import org.apache.spark.SparkConf import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.serializer.KryoTest._ /** @@ -122,9 +124,9 @@ object KryoBenchmark extends BenchmarkBase { def createSerializer(useUnsafe: Boolean): SerializerInstance = { val conf = new SparkConf() - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName) - conf.set("spark.kryo.unsafe", useUnsafe.toString) + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") + conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName) + conf.set(KRYO_USE_UNSAFE, useUnsafe) new KryoSerializer(conf).newInstance() } diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala index 2a15c6f..2915b99 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala @@ -23,6 +23,8 @@ import scala.concurrent.duration._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.serializer.KryoTest._ import org.apache.spark.util.ThreadUtils @@ -69,9 +71,9 @@ object KryoSerializerBenchmark extends BenchmarkBase { def createSparkContext(usePool: Boolean): SparkContext = { val conf = new SparkConf() - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName) - conf.set("spark.kryo.pool", usePool.toString) + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") + conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName) + conf.set(KRYO_USE_POOL, usePool) if (sc != null) { sc.stop() diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala index 46aa9c3..ae87109 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala @@ -28,8 +28,8 @@ class KryoSerializerDistributedSuite extends SparkFunSuite with LocalSparkContex test("kryo objects are serialised consistently in different processes") { val conf = new SparkConf(false) - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", classOf[AppJarRegistrator].getName) + .set(config.SERIALIZER, "org.apache.spark.serializer.KryoSerializer") + .set(config.Kryo.KRYO_USER_REGISTRATORS, classOf[AppJarRegistrator].getName) .set(config.MAX_TASK_FAILURES, 1) .set(config.BLACKLIST_ENABLED, false) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala index cf01f79..25f0b19 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala @@ -21,6 +21,8 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.LocalSparkContext._ import org.apache.spark.SparkContext import org.apache.spark.SparkException +import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Kryo._ class KryoSerializerResizableOutputSuite extends SparkFunSuite { @@ -29,9 +31,9 @@ class KryoSerializerResizableOutputSuite extends SparkFunSuite { test("kryo without resizable output buffer should fail on large array") { val conf = new SparkConf(false) - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - conf.set("spark.kryoserializer.buffer", "1m") - conf.set("spark.kryoserializer.buffer.max", "1m") + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") + conf.set(KRYO_SERIALIZER_BUFFER_SIZE.key, "1m") + conf.set(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, "1m") withSpark(new SparkContext("local", "test", conf)) { sc => intercept[SparkException](sc.parallelize(x).collect()) } @@ -39,9 +41,9 @@ class KryoSerializerResizableOutputSuite extends SparkFunSuite { test("kryo with resizable output buffer should succeed on large array") { val conf = new SparkConf(false) - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - conf.set("spark.kryoserializer.buffer", "1m") - conf.set("spark.kryoserializer.buffer.max", "2m") + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") + conf.set(KRYO_SERIALIZER_BUFFER_SIZE.key, "1m") + conf.set(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, "2m") withSpark(new SparkContext("local", "test", conf)) { sc => assert(sc.parallelize(x).collect() === x) } diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 8af5327..41fb405 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -32,19 +32,21 @@ import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import org.roaringbitmap.RoaringBitmap import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.scheduler.HighlyCompressedMapStatus import org.apache.spark.serializer.KryoTest._ import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.{ThreadUtils, Utils} class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName) - conf.set("spark.kryo.unsafe", "false") + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") + conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName) + conf.set(KRYO_USE_UNSAFE, false) test("SPARK-7392 configuration limits") { - val kryoBufferProperty = "spark.kryoserializer.buffer" - val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max" + val kryoBufferProperty = KRYO_SERIALIZER_BUFFER_SIZE.key + val kryoBufferMaxProperty = KRYO_SERIALIZER_MAX_BUFFER_SIZE.key def newKryoInstance( conf: SparkConf, @@ -81,7 +83,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { test("basic types") { val conf = new SparkConf(false) - conf.set("spark.kryo.registrationRequired", "true") + conf.set(KRYO_REGISTRATION_REQUIRED, true) val ser = new KryoSerializer(conf).newInstance() def check[T: ClassTag](t: T) { @@ -114,7 +116,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { test("pairs") { val conf = new SparkConf(false) - conf.set("spark.kryo.registrationRequired", "true") + conf.set(KRYO_REGISTRATION_REQUIRED, true) val ser = new KryoSerializer(conf).newInstance() def check[T: ClassTag](t: T) { @@ -141,7 +143,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { test("Scala data structures") { val conf = new SparkConf(false) - conf.set("spark.kryo.registrationRequired", "true") + conf.set(KRYO_REGISTRATION_REQUIRED, true) val ser = new KryoSerializer(conf).newInstance() def check[T: ClassTag](t: T) { @@ -169,7 +171,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { } test("Bug: SPARK-10251") { - val ser = new KryoSerializer(conf.clone.set("spark.kryo.registrationRequired", "true")) + val ser = new KryoSerializer(conf.clone.set(KRYO_REGISTRATION_REQUIRED, true)) .newInstance() def check[T: ClassTag](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) @@ -253,7 +255,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { hashMap.put("foo", "bar") check(hashMap) - System.clearProperty("spark.kryo.registrator") + System.clearProperty(KRYO_USER_REGISTRATORS.key) } test("kryo with collect") { @@ -310,7 +312,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { import org.apache.spark.SparkException val conf = new SparkConf(false) - conf.set("spark.kryo.registrator", "this.class.does.not.exist") + conf.set(KRYO_USER_REGISTRATORS, "this.class.does.not.exist") val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance().serialize(1)) assert(thrown.getMessage.contains("Failed to register classes with Kryo")) @@ -337,7 +339,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { test("registration of HighlyCompressedMapStatus") { val conf = new SparkConf(false) - conf.set("spark.kryo.registrationRequired", "true") + conf.set(KRYO_REGISTRATION_REQUIRED, true) // these cases require knowing the internals of RoaringBitmap a little. Blocks span 2^16 // values, and they use a bitmap (dense) if they have more than 4096 values, and an @@ -355,7 +357,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { test("serialization buffer overflow reporting") { import org.apache.spark.SparkException - val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max" + val kryoBufferMaxProperty = KRYO_SERIALIZER_MAX_BUFFER_SIZE.key val largeObject = (1 to 1000000).toArray @@ -409,7 +411,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { test("getAutoReset") { val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance] assert(ser.getAutoReset) - val conf = new SparkConf().set("spark.kryo.registrator", + val conf = new SparkConf().set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName) val ser2 = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance] assert(!ser2.getAutoReset) @@ -438,10 +440,10 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { private def testSerializerInstanceReuse( autoReset: Boolean, referenceTracking: Boolean, usePool: Boolean): Unit = { val conf = new SparkConf(loadDefaults = false) - .set("spark.kryo.referenceTracking", referenceTracking.toString) - .set("spark.kryo.pool", usePool.toString) + .set(KRYO_REFERENCE_TRACKING, referenceTracking) + .set(KRYO_USE_POOL, usePool) if (!autoReset) { - conf.set("spark.kryo.registrator", classOf[RegistratorWithoutAutoReset].getName) + conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName) } val ser = new KryoSerializer(conf) val serInstance = ser.newInstance().asInstanceOf[KryoSerializerInstance] @@ -478,7 +480,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor( Executors.newFixedThreadPool(4)) - val ser = new KryoSerializer(conf.clone.set("spark.kryo.pool", "true")) + val ser = new KryoSerializer(conf.clone.set(KRYO_USE_POOL, true)) val tests = mutable.ListBuffer[Future[Boolean]]() @@ -519,9 +521,9 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { } class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSparkContext { - conf.set("spark.serializer", classOf[KryoSerializer].getName) - conf.set("spark.kryo.registrator", classOf[RegistratorWithoutAutoReset].getName) - conf.set("spark.kryo.referenceTracking", "true") + conf.set(SERIALIZER, classOf[KryoSerializer].getName) + conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName) + conf.set(KRYO_REFERENCE_TRACKING, true) conf.set("spark.shuffle.manager", "sort") conf.set("spark.shuffle.sort.bypassMergeThreshold", "200") diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala index 99882bf..dad080c 100644 --- a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala @@ -24,6 +24,7 @@ import scala.util.Random import org.scalatest.Assertions import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.serializer.KryoTest.RegistratorWithoutAutoReset /** @@ -50,7 +51,7 @@ class SerializerPropertiesSuite extends SparkFunSuite { } test("KryoSerializer does not support relocation when auto-reset is disabled") { - val conf = new SparkConf().set("spark.kryo.registrator", + val conf = new SparkConf().set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName) val ser = new KryoSerializer(conf) assert(!ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()) diff --git a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala index d63a45a..126ba0e 100644 --- a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala @@ -17,17 +17,19 @@ package org.apache.spark.serializer +import org.apache.spark.internal.config.Kryo._ + class UnsafeKryoSerializerSuite extends KryoSerializerSuite { // This test suite should run all tests in KryoSerializerSuite with kryo unsafe. override def beforeAll() { - conf.set("spark.kryo.unsafe", "true") + conf.set(KRYO_USE_UNSAFE, true) super.beforeAll() } override def afterAll() { - conf.set("spark.kryo.unsafe", "false") + conf.set(KRYO_USE_UNSAFE, false) super.afterAll() } } diff --git a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala index 4282850..fc16fe3 100644 --- a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage import org.apache.spark._ - +import org.apache.spark.internal.config._ class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext { /* Tests the ability of Spark to deal with user provided iterators from flatMap @@ -55,7 +55,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext { test("Serializer Reset") { val sconf = new SparkConf().setMaster("local").setAppName("serializer_reset_test") - .set("spark.serializer.objectStreamReset", "10") + .set(SERIALIZER_OBJECT_STREAM_RESET, 10) sc = new SparkContext(sconf) val expand_size = 500 val data = sc.parallelize(Seq(1, 2)). diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index d3f94fb..7aca0ad 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TaskContext} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.SparkListener @@ -829,7 +830,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { test("isDynamicAllocationEnabled") { val conf = new SparkConf() conf.set("spark.master", "yarn") - conf.set("spark.submit.deployMode", "client") + conf.set(SUBMIT_DEPLOY_MODE, "client") assert(Utils.isDynamicAllocationEnabled(conf) === false) assert(Utils.isDynamicAllocationEnabled( conf.set("spark.dynamicAllocation.enabled", "false")) === false) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 5efbf4e..de70153 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -54,8 +54,8 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite val conf = new SparkConf(loadDefaults) // Make the Java serializer write a reset instruction (TC_RESET) after each object to test // for a bug we had with bytes written past the last object in a batch (SPARK-2792) - conf.set("spark.serializer.objectStreamReset", "1") - conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") + conf.set(SERIALIZER_OBJECT_STREAM_RESET, 1) + conf.set(SERIALIZER, "org.apache.spark.serializer.JavaSerializer") conf.set("spark.shuffle.spill.compress", codec.isDefined.toString) conf.set("spark.shuffle.compress", codec.isDefined.toString) codec.foreach { c => conf.set(IO_COMPRESSION_CODEC, c) } diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index 14148e0..3006409 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark._ +import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests.TEST_MEMORY import org.apache.spark.memory.MemoryTestingUtils import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} @@ -268,12 +269,12 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { private def createSparkConf(loadDefaults: Boolean, kryo: Boolean): SparkConf = { val conf = new SparkConf(loadDefaults) if (kryo) { - conf.set("spark.serializer", classOf[KryoSerializer].getName) + conf.set(SERIALIZER, classOf[KryoSerializer].getName) } else { // Make the Java serializer write a reset instruction (TC_RESET) after each object to test // for a bug we had with bytes written past the last object in a batch (SPARK-2792) - conf.set("spark.serializer.objectStreamReset", "1") - conf.set("spark.serializer", classOf[JavaSerializer].getName) + conf.set(SERIALIZER_OBJECT_STREAM_RESET, 1) + conf.set(SERIALIZER, classOf[JavaSerializer].getName) } conf.set("spark.shuffle.sort.bypassMergeThreshold", "0") // Ensure that we actually have multiple batches per spill file diff --git a/mllib/src/test/scala/org/apache/spark/ml/attribute/AttributeSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/attribute/AttributeSuite.scala index eb5f3ca..7f892fd 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/attribute/AttributeSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/attribute/AttributeSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.ml.attribute import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.serializer.KryoSerializer import org.apache.spark.sql.types._ @@ -225,7 +226,7 @@ class AttributeSuite extends SparkFunSuite { test("Kryo class register") { val conf = new SparkConf(false) - conf.set("spark.kryo.registrationRequired", "true") + conf.set(KRYO_REGISTRATION_REQUIRED, true) val ser = new KryoSerializer(conf).newInstance() diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala index cca7399..5a74490 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala @@ -18,13 +18,14 @@ package org.apache.spark.ml.feature import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.ml.linalg.Vectors import org.apache.spark.serializer.KryoSerializer class InstanceSuite extends SparkFunSuite{ test("Kryo class register") { val conf = new SparkConf(false) - conf.set("spark.kryo.registrationRequired", "true") + conf.set(KRYO_REGISTRATION_REQUIRED, true) val ser = new KryoSerializer(conf).newInstance() diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/LabeledPointSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/LabeledPointSuite.scala index 05c7a58..63c1635 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/LabeledPointSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/LabeledPointSuite.scala @@ -18,13 +18,14 @@ package org.apache.spark.ml.feature import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.ml.linalg.Vectors import org.apache.spark.serializer.KryoSerializer class LabeledPointSuite extends SparkFunSuite { test("Kryo class register") { val conf = new SparkConf(false) - conf.set("spark.kryo.registrationRequired", "true") + conf.set(KRYO_REGISTRATION_REQUIRED, true) val ser = new KryoSerializer(conf).newInstance() diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreePointSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreePointSuite.scala index f41abe4..3a44e79 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreePointSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreePointSuite.scala @@ -18,12 +18,13 @@ package org.apache.spark.ml.tree.impl import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.serializer.KryoSerializer class TreePointSuite extends SparkFunSuite { test("Kryo class register") { val conf = new SparkConf(false) - conf.set("spark.kryo.registrationRequired", "true") + conf.set(KRYO_REGISTRATION_REQUIRED, true) val ser = new KryoSerializer(conf).newInstance() diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala index d18cef7..c4bf5b2 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.mllib.clustering import scala.util.Random import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext} import org.apache.spark.mllib.util.TestingUtils._ @@ -316,7 +317,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext { test("Kryo class register") { val conf = new SparkConf(false) - conf.set("spark.kryo.registrationRequired", "true") + conf.set(KRYO_REGISTRATION_REQUIRED, true) val ser = new KryoSerializer(conf).newInstance() diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala index f4fa216..a679fe4 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala @@ -18,8 +18,10 @@ package org.apache.spark.mllib.feature import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.util.Utils class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext { @@ -109,12 +111,16 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext { test("big model load / save") { // backupping old values - val oldBufferConfValue = spark.conf.get("spark.kryoserializer.buffer.max", "64m") - val oldBufferMaxConfValue = spark.conf.get("spark.kryoserializer.buffer", "64k") + val oldBufferConfValue = spark.conf.get(KRYO_SERIALIZER_BUFFER_SIZE.key, "64m") + val oldBufferMaxConfValue = spark.conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, "64k") + val oldSetCommandRejectsSparkCoreConfs = spark.conf.get( + SET_COMMAND_REJECTS_SPARK_CORE_CONFS.key, "true") // setting test values to trigger partitioning - spark.conf.set("spark.kryoserializer.buffer", "50b") - spark.conf.set("spark.kryoserializer.buffer.max", "50b") + + // this is needed to set configurations which are also defined to SparkConf + spark.conf.set(SET_COMMAND_REJECTS_SPARK_CORE_CONFS.key, "false") + spark.conf.set(KRYO_SERIALIZER_BUFFER_SIZE.key, "50b") // create a model bigger than 50 Bytes val word2VecMap = Map((0 to 10).map(i => s"$i" -> Array.fill(10)(0.1f)): _*) @@ -137,8 +143,9 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext { "that spans over multiple partitions", t) } finally { Utils.deleteRecursively(tempDir) - spark.conf.set("spark.kryoserializer.buffer", oldBufferConfValue) - spark.conf.set("spark.kryoserializer.buffer.max", oldBufferMaxConfValue) + spark.conf.set(KRYO_SERIALIZER_BUFFER_SIZE.key, oldBufferConfValue) + spark.conf.set(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, oldBufferMaxConfValue) + spark.conf.set(SET_COMMAND_REJECTS_SPARK_CORE_CONFS.key, oldSetCommandRejectsSparkCoreConfs) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala index 2c3f846..b4520d4 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala @@ -27,6 +27,7 @@ import org.mockito.Mockito.when import org.scalatest.mockito.MockitoSugar._ import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.ml.{linalg => newlinalg} import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.serializer.KryoSerializer @@ -34,7 +35,7 @@ import org.apache.spark.serializer.KryoSerializer class MatricesSuite extends SparkFunSuite { test("kryo class register") { val conf = new SparkConf(false) - conf.set("spark.kryo.registrationRequired", "true") + conf.set(KRYO_REGISTRATION_REQUIRED, true) val ser = new KryoSerializer(conf).newInstance() diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index 217b4a3..fee0b02 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -25,6 +25,7 @@ import org.json4s.jackson.JsonMethods.{parse => parseJson} import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.ml.{linalg => newlinalg} import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.serializer.KryoSerializer @@ -38,7 +39,7 @@ class VectorsSuite extends SparkFunSuite with Logging { test("kryo class register") { val conf = new SparkConf(false) - conf.set("spark.kryo.registrationRequired", "true") + conf.set(KRYO_REGISTRATION_REQUIRED, true) val ser = new KryoSerializer(conf).newInstance() def check[T: ClassTag](t: T) { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala index c1449ec..d3366dc 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.mllib.regression import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.ml.feature.{LabeledPoint => NewLabeledPoint} import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.serializer.KryoSerializer @@ -57,7 +58,7 @@ class LabeledPointSuite extends SparkFunSuite { test("Kryo class register") { val conf = new SparkConf(false) - conf.set("spark.kryo.registrationRequired", "true") + conf.set(KRYO_REGISTRATION_REQUIRED, true) val ser = new KryoSerializer(conf).newInstance() diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/distribution/MultivariateGaussianSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/distribution/MultivariateGaussianSuite.scala index 5b4a260..4c88fd3 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/distribution/MultivariateGaussianSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/distribution/MultivariateGaussianSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.mllib.stat.distribution import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.mllib.linalg.{Matrices, Vectors} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ @@ -83,7 +84,7 @@ class MultivariateGaussianSuite extends SparkFunSuite with MLlibTestSparkContext test("Kryo class register") { val conf = new SparkConf(false) - conf.set("spark.kryo.registrationRequired", "true") + conf.set(KRYO_REGISTRATION_REQUIRED, true) val ser = new KryoSerializer(conf).newInstance() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 39834fc..e664b64 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -154,12 +154,11 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true", MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString) - Seq("spark.jars", "spark.files").foreach { key => - conf.getOption(key).foreach { value => - val resolved = KubernetesUtils.resolveFileUrisAndPath(Utils.stringToSeq(value)) - if (resolved.nonEmpty) { - additionalProps.put(key, resolved.mkString(",")) - } + Seq(JARS, FILES).foreach { key => + val value = conf.get(key) + val resolved = KubernetesUtils.resolveFileUrisAndPath(value) + if (resolved.nonEmpty) { + additionalProps.put(key.key, resolved.mkString(",")) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala index 76b4ec9..bd3f8a1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala @@ -109,21 +109,22 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf) } private def additionalJavaProperties(resource: String): Map[String, String] = { - resourceType(APP_RESOURCE_TYPE_JAVA) ++ mergeFileList("spark.jars", Seq(resource)) + resourceType(APP_RESOURCE_TYPE_JAVA) ++ mergeFileList(JARS, Seq(resource)) } private def additionalPythonProperties(resource: String): Map[String, String] = { resourceType(APP_RESOURCE_TYPE_PYTHON) ++ - mergeFileList("spark.files", Seq(resource) ++ conf.pyFiles) + mergeFileList(FILES, Seq(resource) ++ conf.pyFiles) } private def additionalRProperties(resource: String): Map[String, String] = { - resourceType(APP_RESOURCE_TYPE_R) ++ mergeFileList("spark.files", Seq(resource)) + resourceType(APP_RESOURCE_TYPE_R) ++ mergeFileList(FILES, Seq(resource)) } - private def mergeFileList(key: String, filesToAdd: Seq[String]): Map[String, String] = { - val existing = Utils.stringToSeq(conf.get(key, "")) - Map(key -> (existing ++ filesToAdd).distinct.mkString(",")) + private def mergeFileList(key: ConfigEntry[Seq[String]], filesToAdd: Seq[String]) + : Map[String, String] = { + val existing = conf.get(key) + Map(key.key -> (existing ++ filesToAdd).distinct.mkString(",")) } private def resourceType(resType: String): Map[String, String] = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 90255a5..ccf88cc 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -143,7 +143,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { val sparkConf = new SparkConf() .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") .setJars(allJars) - .set("spark.files", allFiles.mkString(",")) + .set(FILES, allFiles) .set(CONTAINER_IMAGE, "spark-driver:latest") val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) @@ -154,8 +154,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { "spark.app.id" -> KubernetesTestConf.APP_ID, KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> kubernetesConf.resourceNamePrefix, "spark.kubernetes.submitInDriver" -> "true", - "spark.jars" -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar", - "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt", + JARS.key -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar", + FILES.key -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt", MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString) assert(additionalProperties === expectedSparkConf) } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala index c869803..e539c8e 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -27,6 +27,7 @@ import org.scalatest.concurrent.Eventually import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.JARS import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.internal.config.UI.UI_ENABLED @@ -86,7 +87,7 @@ private[spark] class SparkAppConf { def get(key: String): String = map.getOrElse(key, "") - def setJars(jars: Seq[String]): Unit = set("spark.jars", jars.mkString(",")) + def setJars(jars: Seq[String]): Unit = set(JARS.key, jars.mkString(",")) override def toString: String = map.toString diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala index 32ac4f3..bc1247a 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala @@ -25,6 +25,7 @@ import org.apache.spark.deploy.mesos.config._ import org.apache.spark.deploy.mesos.ui.MesosClusterUI import org.apache.spark.deploy.rest.mesos.MesosRestServer import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Deploy._ import org.apache.spark.scheduler.cluster.mesos._ import org.apache.spark.util.{CommandLineUtils, ShutdownHookManager, SparkUncaughtExceptionHandler, Utils} diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index dd0b2ba..2b8655c 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -63,11 +63,6 @@ package object config { .timeConf(TimeUnit.SECONDS) .createWithDefaultString("30s") - private[spark] val RECOVERY_MODE = - ConfigBuilder("spark.deploy.recoveryMode") - .stringConf - .createWithDefault("NONE") - private[spark] val DISPATCHER_WEBUI_URL = ConfigBuilder("spark.mesos.dispatcher.webui.url") .doc("Set the Spark Mesos dispatcher webui_url for interacting with the " + @@ -75,13 +70,6 @@ package object config { .stringConf .createOptional - private[spark] val ZOOKEEPER_URL = - ConfigBuilder("spark.deploy.zookeeper.url") - .doc("When `spark.deploy.recoveryMode` is set to ZOOKEEPER, this " + - "configuration is used to set the zookeeper URL to connect to.") - .stringConf - .createOptional - private[spark] val HISTORY_SERVER_URL = ConfigBuilder("spark.mesos.dispatcher.historyServer.url") .doc("Set the URL of the history server. The dispatcher will then " + diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala index 61ab3e8..123412f 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala @@ -26,6 +26,7 @@ import org.apache.zookeeper.KeeperException.NoNodeException import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkCuratorUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Deploy._ import org.apache.spark.util.Utils /** @@ -94,13 +95,13 @@ private[spark] class ZookeeperMesosClusterPersistenceEngine( zk: CuratorFramework, conf: SparkConf) extends MesosClusterPersistenceEngine with Logging { - private val WORKING_DIR = - conf.get("spark.deploy.zookeeper.dir", "/spark_mesos_dispatcher") + "/" + baseDir + private val workingDir = + conf.get(ZOOKEEPER_DIRECTORY).getOrElse("/spark_mesos_dispatcher") + "/" + baseDir - SparkCuratorUtil.mkdir(zk, WORKING_DIR) + SparkCuratorUtil.mkdir(zk, workingDir) def path(name: String): String = { - WORKING_DIR + "/" + name + workingDir + "/" + name } override def expunge(name: String): Unit = { @@ -129,6 +130,6 @@ private[spark] class ZookeeperMesosClusterPersistenceEngine( } override def fetchAll[T](): Iterable[T] = { - zk.getChildren.forPath(WORKING_DIR).asScala.flatMap(fetch[T]) + zk.getChildren.forPath(workingDir).asScala.flatMap(fetch[T]) } } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 021b1ac..8c961a5 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -32,7 +32,7 @@ import org.apache.mesos.Protos.TaskStatus.Reason import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState} import org.apache.spark.deploy.mesos.{config, MesosDriverDescription} import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse} -import org.apache.spark.internal.config.{CORES_MAX, EXECUTOR_LIBRARY_PATH, EXECUTOR_MEMORY} +import org.apache.spark.internal.config._ import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.Utils @@ -432,7 +432,7 @@ private[spark] class MesosClusterScheduler( private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = { val confUris = List(conf.getOption("spark.mesos.uris"), desc.conf.getOption("spark.mesos.uris"), - desc.conf.getOption("spark.submit.pyFiles")).flatMap( + Some(desc.conf.get(SUBMIT_PYTHON_FILES).mkString(","))).flatMap( _.map(_.split(",").map(_.trim)) ).flatten @@ -534,16 +534,16 @@ private[spark] class MesosClusterScheduler( desc.conf.getOption(CORES_MAX.key).foreach { v => options ++= Seq("--total-executor-cores", v) } - desc.conf.getOption("spark.submit.pyFiles").foreach { pyFiles => - val formattedFiles = pyFiles.split(",") - .map { path => new File(sandboxPath, path.split("/").last).toString() } - .mkString(",") - options ++= Seq("--py-files", formattedFiles) - } + + val pyFiles = desc.conf.get(SUBMIT_PYTHON_FILES) + val formattedFiles = pyFiles.map { path => + new File(sandboxPath, path.split("/").last).toString() + }.mkString(",") + options ++= Seq("--py-files", formattedFiles) // --conf val replicatedOptionsBlacklist = Set( - "spark.jars", // Avoids duplicate classes in classpath + JARS.key, // Avoids duplicate classes in classpath "spark.submit.deployMode", // this would be set to `cluster`, but we need client "spark.master" // this contains the address of the dispatcher, not master ) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8492180..7992292 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -68,7 +68,7 @@ private[spark] class Client( private val yarnClient = YarnClient.createYarnClient private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf)) - private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster" + private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster" // AM related configurations private val amMemory = if (isClusterMode) { @@ -1532,8 +1532,8 @@ private[spark] class YarnClusterApplication extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = { // SparkSubmit would use yarn cache to distribute files & jars in yarn mode, // so remove them from sparkConf here for yarn mode. - conf.remove("spark.jars") - conf.remove("spark.files") + conf.remove(JARS) + conf.remove(FILES) new Client(new ClientArguments(args), conf).run() } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 9acd995..25827fd 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -40,6 +40,7 @@ import org.scalatest.Matchers import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils} import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.config._ import org.apache.spark.util.{SparkConfWithEnv, Utils} class ClientSuite extends SparkFunSuite with Matchers { @@ -368,7 +369,7 @@ class ClientSuite extends SparkFunSuite with Matchers { val resources = Map("fpga" -> 2, "gpu" -> 3) ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq) - val conf = new SparkConf().set("spark.submit.deployMode", deployMode) + val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, deployMode) resources.foreach { case (name, v) => conf.set(prefix + name, v.toString) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index b7e83c8..faddb8f 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -443,7 +443,7 @@ private object YarnClusterDriver extends Logging with Matchers { // If we are running in yarn-cluster mode, verify that driver logs links and present and are // in the expected format. - if (conf.get("spark.submit.deployMode") == "cluster") { + if (conf.get(SUBMIT_DEPLOY_MODE) == "cluster") { assert(listener.driverLogs.nonEmpty) val driverLogs = listener.driverLogs.get assert(driverLogs.size === 2) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala index 68f7de0..69728ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala @@ -21,6 +21,7 @@ import com.esotericsoftware.kryo.{Kryo, Serializer} import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.spark.SparkConf +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.sql.test.SharedSQLContext @@ -33,7 +34,7 @@ class DatasetSerializerRegistratorSuite extends QueryTest with SharedSQLContext override protected def sparkConf: SparkConf = { // Make sure we use the KryoRegistrator - super.sparkConf.set("spark.kryo.registrator", TestRegistrator().getClass.getCanonicalName) + super.sparkConf.set(KRYO_USER_REGISTRATORS, TestRegistrator().getClass.getCanonicalName) } test("Kryo registrator") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala index e174dc6..0869e25 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala @@ -45,8 +45,8 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase { private val conf = new SparkConf(false) // Make the Java serializer write a reset instruction (TC_RESET) after each object to test // for a bug we had with bytes written past the last object in a batch (SPARK-2792) - .set("spark.serializer.objectStreamReset", "1") - .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") + .set(config.SERIALIZER_OBJECT_STREAM_RESET, 1) + .set(config.SERIALIZER, "org.apache.spark.serializer.JavaSerializer") private def withFakeTaskContext(f: => Unit): Unit = { val sc = new SparkContext("local", "test", conf) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index 7b55e83..1c89910 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -22,7 +22,8 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, import scala.util.Random import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED +import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Kryo._ import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.sql.catalyst.InternalRow @@ -309,7 +310,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { test("Spark-14521") { val ser = new KryoSerializer( - (new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance() + (new SparkConf).set(KRYO_REFERENCE_TRACKING, false)).newInstance() val key = Seq(BoundReference(0, LongType, false)) // Testing Kryo serialization of HashedRelation --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org