This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 05372d1 [SPARK-26489][CORE] Use ConfigEntry for hardcoded configs for python/r categories 05372d1 is described below commit 05372d188aeaeff5e8de8866ec6e7b932bafa70f Author: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com> AuthorDate: Thu Jan 3 14:30:27 2019 -0800 [SPARK-26489][CORE] Use ConfigEntry for hardcoded configs for python/r categories ## What changes were proposed in this pull request? The PR makes hardcoded configs below to use ConfigEntry. * spark.pyspark * spark.python * spark.r This patch doesn't change configs which are not relevant to SparkConf (e.g. system properties, python source code) ## How was this patch tested? Existing tests. Closes #23428 from HeartSaVioR/SPARK-26489. Authored-by: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../org/apache/spark/api/python/PythonRunner.scala | 6 +-- .../spark/api/python/PythonWorkerFactory.scala | 15 +++---- .../scala/org/apache/spark/api/r/RBackend.scala | 10 ++--- .../org/apache/spark/api/r/RBackendHandler.scala | 7 ++-- .../scala/org/apache/spark/api/r/RRunner.scala | 8 ++-- .../scala/org/apache/spark/deploy/RRunner.scala | 9 +++-- .../org/apache/spark/internal/config/Python.scala | 47 ++++++++++++++++++++++ .../config/R.scala} | 26 ++++++++---- .../org/apache/spark/internal/config/package.scala | 4 -- .../k8s/features/BasicExecutorFeatureStep.scala | 1 + .../features/BasicExecutorFeatureStepSuite.scala | 1 + .../org/apache/spark/deploy/yarn/Client.scala | 1 + .../apache/spark/deploy/yarn/YarnAllocator.scala | 1 + 13 files changed, 96 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index f73e95e..6b748c8 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -27,7 +27,7 @@ import scala.collection.JavaConverters._ import org.apache.spark._ import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY +import org.apache.spark.internal.config.Python._ import org.apache.spark.security.SocketAuthHelper import org.apache.spark.util._ @@ -71,7 +71,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private val conf = SparkEnv.get.conf private val bufferSize = conf.getInt("spark.buffer.size", 65536) - private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) + private val reuseWorker = conf.get(PYTHON_WORKER_REUSE) // each python worker gets an equal part of the allocation. the worker pool will grow to the // number of concurrent tasks, which is determined by the number of cores in this executor. private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY) @@ -496,7 +496,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( extends Thread(s"Worker Monitor for $pythonExec") { /** How long to wait before killing the python worker if a task cannot be interrupted. */ - private val taskKillTimeout = env.conf.getTimeAsMs("spark.python.task.killTimeout", "2s") + private val taskKillTimeout = env.conf.get(PYTHON_TASK_KILL_TIMEOUT) setDaemon(true) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 1f2f503..09e219f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -28,6 +28,7 @@ import scala.collection.mutable import org.apache.spark._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Python._ import org.apache.spark.security.SocketAuthHelper import org.apache.spark.util.{RedirectThread, Utils} @@ -41,7 +42,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String // currently only works on UNIX-based systems now because it uses signals for child management, // so we can also fall back to launching workers, pyspark/worker.py (by default) directly. private val useDaemon = { - val useDaemonEnabled = SparkEnv.get.conf.getBoolean("spark.python.use.daemon", true) + val useDaemonEnabled = SparkEnv.get.conf.get(PYTHON_USE_DAEMON) // This flag is ignored on Windows as it's unable to fork. !System.getProperty("os.name").startsWith("Windows") && useDaemonEnabled @@ -53,21 +54,21 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String // This configuration indicates the module to run the daemon to execute its Python workers. private val daemonModule = - SparkEnv.get.conf.getOption("spark.python.daemon.module").map { value => + SparkEnv.get.conf.get(PYTHON_DAEMON_MODULE).map { value => logInfo( - s"Python daemon module in PySpark is set to [$value] in 'spark.python.daemon.module', " + + s"Python daemon module in PySpark is set to [$value] in '${PYTHON_DAEMON_MODULE.key}', " + "using this to start the daemon up. Note that this configuration only has an effect when " + - "'spark.python.use.daemon' is enabled and the platform is not Windows.") + s"'${PYTHON_USE_DAEMON.key}' is enabled and the platform is not Windows.") value }.getOrElse("pyspark.daemon") // This configuration indicates the module to run each Python worker. private val workerModule = - SparkEnv.get.conf.getOption("spark.python.worker.module").map { value => + SparkEnv.get.conf.get(PYTHON_WORKER_MODULE).map { value => logInfo( - s"Python worker module in PySpark is set to [$value] in 'spark.python.worker.module', " + + s"Python worker module in PySpark is set to [$value] in '${PYTHON_WORKER_MODULE.key}', " + "using this to start the worker up. Note that this configuration only has an effect when " + - "'spark.python.use.daemon' is disabled or the platform is Windows.") + s"'${PYTHON_USE_DAEMON.key}' is disabled or the platform is Windows.") value }.getOrElse("pyspark.worker") diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala index 50c8fdf..36b4132 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala @@ -32,6 +32,7 @@ import io.netty.handler.timeout.ReadTimeoutHandler import org.apache.spark.SparkConf import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.R._ /** * Netty-based backend server that is used to communicate between R and Java. @@ -47,10 +48,8 @@ private[spark] class RBackend { def init(): (Int, RAuthHelper) = { val conf = new SparkConf() - val backendConnectionTimeout = conf.getInt( - "spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT) - bossGroup = new NioEventLoopGroup( - conf.getInt("spark.r.numRBackendThreads", SparkRDefaults.DEFAULT_NUM_RBACKEND_THREADS)) + val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT) + bossGroup = new NioEventLoopGroup(conf.get(R_NUM_BACKEND_THREADS)) val workerGroup = bossGroup val handler = new RBackendHandler(this) val authHelper = new RAuthHelper(conf) @@ -126,8 +125,7 @@ private[spark] object RBackend extends Logging { // Connection timeout is set by socket client. To make it configurable we will pass the // timeout value to client inside the temp file val conf = new SparkConf() - val backendConnectionTimeout = conf.getInt( - "spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT) + val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT) // tell the R process via temporary file val path = args(0) diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala index 18fc595..7b74efa 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala @@ -29,6 +29,7 @@ import io.netty.handler.timeout.ReadTimeoutException import org.apache.spark.SparkConf import org.apache.spark.api.r.SerDe._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.R._ import org.apache.spark.util.{ThreadUtils, Utils} /** @@ -98,10 +99,8 @@ private[r] class RBackendHandler(server: RBackend) } } val conf = new SparkConf() - val heartBeatInterval = conf.getInt( - "spark.r.heartBeatInterval", SparkRDefaults.DEFAULT_HEARTBEAT_INTERVAL) - val backendConnectionTimeout = conf.getInt( - "spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT) + val heartBeatInterval = conf.get(R_HEARTBEAT_INTERVAL) + val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT) val interval = Math.min(heartBeatInterval, backendConnectionTimeout - 1) execService.scheduleAtFixedRate(pingRunner, interval, interval, TimeUnit.SECONDS) diff --git a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala index e7fdc39..3fdea04 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala @@ -27,6 +27,7 @@ import scala.util.Try import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.R._ import org.apache.spark.util.Utils /** @@ -340,11 +341,10 @@ private[r] object RRunner { // "spark.sparkr.r.command" is deprecated and replaced by "spark.r.command", // but kept here for backward compatibility. val sparkConf = SparkEnv.get.conf - var rCommand = sparkConf.get("spark.sparkr.r.command", "Rscript") - rCommand = sparkConf.get("spark.r.command", rCommand) + var rCommand = sparkConf.get(SPARKR_COMMAND) + rCommand = sparkConf.get(R_COMMAND).orElse(Some(rCommand)).get - val rConnectionTimeout = sparkConf.getInt( - "spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT) + val rConnectionTimeout = sparkConf.get(R_BACKEND_CONNECTION_TIMEOUT) val rOptions = "--vanilla" val rLibDir = RUtils.sparkRPackagePath(isDriver = false) val rExecScript = rLibDir(0) + "/SparkR/worker/" + script diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala index e86b362..6284e6a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala @@ -25,7 +25,8 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path import org.apache.spark.{SparkException, SparkUserAppException} -import org.apache.spark.api.r.{RBackend, RUtils, SparkRDefaults} +import org.apache.spark.api.r.{RBackend, RUtils} +import org.apache.spark.internal.config.R._ import org.apache.spark.util.RedirectThread /** @@ -43,8 +44,8 @@ object RRunner { val rCommand = { // "spark.sparkr.r.command" is deprecated and replaced by "spark.r.command", // but kept here for backward compatibility. - var cmd = sys.props.getOrElse("spark.sparkr.r.command", "Rscript") - cmd = sys.props.getOrElse("spark.r.command", cmd) + var cmd = sys.props.getOrElse(SPARKR_COMMAND.key, SPARKR_COMMAND.defaultValue.get) + cmd = sys.props.getOrElse(R_COMMAND.key, cmd) if (sys.props.getOrElse("spark.submit.deployMode", "client") == "client") { cmd = sys.props.getOrElse("spark.r.driver.command", cmd) } @@ -53,7 +54,7 @@ object RRunner { // Connection timeout set by R process on its connection to RBackend in seconds. val backendConnectionTimeout = sys.props.getOrElse( - "spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT.toString) + R_BACKEND_CONNECTION_TIMEOUT.key, R_BACKEND_CONNECTION_TIMEOUT.defaultValue.get.toString) // Check if the file path exists. // If not, change directory to current working directory for YARN cluster mode diff --git a/core/src/main/scala/org/apache/spark/internal/config/Python.scala b/core/src/main/scala/org/apache/spark/internal/config/Python.scala new file mode 100644 index 0000000..26a0598 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/config/Python.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.internal.config + +import java.util.concurrent.TimeUnit + +import org.apache.spark.network.util.ByteUnit + +private[spark] object Python { + val PYTHON_WORKER_REUSE = ConfigBuilder("spark.python.worker.reuse") + .booleanConf + .createWithDefault(true) + + val PYTHON_TASK_KILL_TIMEOUT = ConfigBuilder("spark.python.task.killTimeout") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("2s") + + val PYTHON_USE_DAEMON = ConfigBuilder("spark.python.use.daemon") + .booleanConf + .createWithDefault(true) + + val PYTHON_DAEMON_MODULE = ConfigBuilder("spark.python.daemon.module") + .stringConf + .createOptional + + val PYTHON_WORKER_MODULE = ConfigBuilder("spark.python.worker.module") + .stringConf + .createOptional + + val PYSPARK_EXECUTOR_MEMORY = ConfigBuilder("spark.executor.pyspark.memory") + .bytesConf(ByteUnit.MiB) + .createOptional +} diff --git a/core/src/main/scala/org/apache/spark/api/r/SparkRDefaults.scala b/core/src/main/scala/org/apache/spark/internal/config/R.scala similarity index 56% rename from core/src/main/scala/org/apache/spark/api/r/SparkRDefaults.scala rename to core/src/main/scala/org/apache/spark/internal/config/R.scala index af67cbb..26e06a5 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SparkRDefaults.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/R.scala @@ -14,17 +14,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.spark.internal.config -package org.apache.spark.api.r +private[spark] object R { -private[spark] object SparkRDefaults { + val R_BACKEND_CONNECTION_TIMEOUT = ConfigBuilder("spark.r.backendConnectionTimeout") + .intConf + .createWithDefault(6000) - // Default value for spark.r.backendConnectionTimeout config - val DEFAULT_CONNECTION_TIMEOUT: Int = 6000 + val R_NUM_BACKEND_THREADS = ConfigBuilder("spark.r.numRBackendThreads") + .intConf + .createWithDefault(2) - // Default value for spark.r.heartBeatInterval config - val DEFAULT_HEARTBEAT_INTERVAL: Int = 100 + val R_HEARTBEAT_INTERVAL = ConfigBuilder("spark.r.heartBeatInterval") + .intConf + .createWithDefault(100) - // Default value for spark.r.numRBackendThreads config - val DEFAULT_NUM_RBACKEND_THREADS = 2 + val SPARKR_COMMAND = ConfigBuilder("spark.sparkr.r.command") + .stringConf + .createWithDefault("Rscript") + + val R_COMMAND = ConfigBuilder("spark.r.command") + .stringConf + .createOptional } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index d8e9c09..da80604 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -166,10 +166,6 @@ package object config { .checkValue(_ >= 0, "The off-heap memory size must not be negative") .createWithDefault(0) - private[spark] val PYSPARK_EXECUTOR_MEMORY = ConfigBuilder("spark.executor.pyspark.memory") - .bytesConf(ByteUnit.MiB) - .createOptional - private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal() .booleanConf.createWithDefault(false) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index c8bf7cd..dd73a5e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -25,6 +25,7 @@ import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Python._ import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index c2efab0..e28c650 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Python._ import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 184fb6a..44a60b8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -53,6 +53,7 @@ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Python._ import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils} import org.apache.spark.util.{CallerContext, Utils} diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index a3feca5..8c6eff99 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -36,6 +36,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Python._ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org