This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 13b23b247351 [SPARK-53459][CORE] Use ReadOnlySparkConf in more places 13b23b247351 is described below commit 13b23b247351ed1afe6a17acfe919316290a3eca Author: Tim Armstrong <tim.armstr...@databricks.com> AuthorDate: Thu Sep 4 10:09:27 2025 +0800 [SPARK-53459][CORE] Use ReadOnlySparkConf in more places ### What changes were proposed in this pull request? Convert some more places to use ReadOnlySparkConf, which was added by SPARK-50515/https://github.com/apache/spark/pull/49100. ### Why are the changes needed? Utils should follow best practices for using read-only confs to discourage unnecessary conf cloning elsewhere in the codebase. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No tests added. ### Was this patch authored or co-authored using generative AI tooling? No Closes #52223 from timarmstrong/spark-53459-readonly-conf. Authored-by: Tim Armstrong <tim.armstr...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/streaming/kafka010/KafkaUtils.scala | 2 +- .../streaming/kafka010/PerPartitionConfig.scala | 4 +- .../kinesis/KinesisReadConfigurations.scala | 4 +- .../main/scala/org/apache/spark/SparkConf.scala | 12 ++--- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 12 ++--- .../main/scala/org/apache/spark/util/Utils.scala | 54 +++++++++++----------- .../scala/org/apache/spark/repl/SparkILoop.scala | 4 +- .../scheduler/cluster/YarnSchedulerBackend.scala | 2 +- .../sql/hive/thriftserver/HiveThriftServer2.scala | 2 +- 9 files changed, 49 insertions(+), 47 deletions(-) diff --git a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala index 54cb9ff39d99..6b0ec6294710 100644 --- a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala +++ b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala @@ -112,7 +112,7 @@ object KafkaUtils extends Logging { locationStrategy: LocationStrategy, consumerStrategy: ConsumerStrategy[K, V] ): InputDStream[ConsumerRecord[K, V]] = { - val ppc = new DefaultPerPartitionConfig(ssc.sparkContext.getConf) + val ppc = new DefaultPerPartitionConfig(ssc.sparkContext.getReadOnlyConf) createDirectStream[K, V](ssc, locationStrategy, consumerStrategy, ppc) } diff --git a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala index b261500a454a..058ac00ae27d 100644 --- a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala +++ b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.kafka010 import org.apache.kafka.common.TopicPartition -import org.apache.spark.SparkConf +import org.apache.spark.ReadOnlySparkConf /** * Interface for user-supplied configurations that can't otherwise be set via Spark properties, @@ -37,7 +37,7 @@ abstract class PerPartitionConfig extends Serializable { /** * Default per-partition configuration */ -private class DefaultPerPartitionConfig(conf: SparkConf) +private class DefaultPerPartitionConfig(conf: ReadOnlySparkConf) extends PerPartitionConfig { val maxRate = conf.get(MAX_RATE_PER_PARTITION) val minRate = conf.get(MIN_RATE_PER_PARTITION) diff --git a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala index 871071e4677e..c2d0a39d6ae9 100644 --- a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala +++ b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala @@ -44,9 +44,9 @@ private[kinesis] object KinesisReadConfigurations { def apply(ssc: StreamingContext): KinesisReadConfigurations = { KinesisReadConfigurations( - maxRetries = ssc.sc.getConf.getInt(RETRY_MAX_ATTEMPTS_KEY, DEFAULT_MAX_RETRIES), + maxRetries = ssc.sc.getReadOnlyConf.getInt(RETRY_MAX_ATTEMPTS_KEY, DEFAULT_MAX_RETRIES), retryWaitTimeMs = JavaUtils.timeStringAsMs( - ssc.sc.getConf.get(RETRY_WAIT_TIME_KEY, DEFAULT_RETRY_WAIT_TIME)), + ssc.sc.getReadOnlyConf.get(RETRY_WAIT_TIME_KEY, DEFAULT_RETRY_WAIT_TIME)), retryTimeoutMs = ssc.graph.batchDuration.milliseconds) } diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 1d1901ae4f68..65b7fee9d889 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -255,6 +255,12 @@ trait ReadOnlySparkConf { throw new IllegalArgumentException(s"Illegal value for config key $key: ${e.getMessage}", e) } } + + /** + * By using this instead of System.getenv(), environment variables can be mocked + * in unit tests. + */ + private[spark] def getenv(name: String): String = System.getenv(name) } /** @@ -528,12 +534,6 @@ class SparkConf(loadDefaults: Boolean) cloned } - /** - * By using this instead of System.getenv(), environment variables can be mocked - * in unit tests. - */ - private[spark] def getenv(name: String): String = System.getenv(name) - /** * Checks for illegal or deprecated config settings. Throws an exception for the former. Not * idempotent - may mutate this conf object to convert deprecated settings to supported ones. diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 9c40f4cd1cf7..4af2aa6394b1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -36,7 +36,7 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.{Token, TokenIdentifier} import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.{ReadOnlySparkConf, SparkConf, SparkException} import org.apache.spark.internal.{Logging, LogKeys} import org.apache.spark.internal.config.BUFFER_SIZE import org.apache.spark.util.ArrayImplicits._ @@ -90,7 +90,7 @@ private[spark] class SparkHadoopUtil extends Logging { * Appends spark.hadoop.* configurations from a [[SparkConf]] to a Hadoop * configuration without the spark.hadoop. prefix. */ - def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = { + def appendSparkHadoopConfigs(conf: ReadOnlySparkConf, hadoopConf: Configuration): Unit = { SparkHadoopUtil.appendSparkHadoopConfigs(conf, hadoopConf) } @@ -430,14 +430,14 @@ private[spark] object SparkHadoopUtil extends Logging { * and if found on the classpath, those of core-site.xml. * This is done before the spark overrides are applied. */ - private[spark] def newConfiguration(conf: SparkConf): Configuration = { + private[spark] def newConfiguration(conf: ReadOnlySparkConf): Configuration = { val hadoopConf = new Configuration() appendS3AndSparkHadoopHiveConfigurations(conf, hadoopConf) hadoopConf } private def appendS3AndSparkHadoopHiveConfigurations( - conf: SparkConf, + conf: ReadOnlySparkConf, hadoopConf: Configuration): Unit = { // Note: this null check is around more than just access to the "conf" object to maintain // the behavior of the old implementation of this code, for backwards compatibility. @@ -514,7 +514,7 @@ private[spark] object SparkHadoopUtil extends Logging { } } - private def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = { + private def appendSparkHadoopConfigs(conf: ReadOnlySparkConf, hadoopConf: Configuration): Unit = { // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar" for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) { hadoopConf.set(key.substring("spark.hadoop.".length), value, @@ -532,7 +532,7 @@ private[spark] object SparkHadoopUtil extends Logging { } } - private def appendSparkHiveConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = { + private def appendSparkHiveConfigs(conf: ReadOnlySparkConf, hadoopConf: Configuration): Unit = { // Copy any "spark.hive.foo=bar" spark properties into conf as "hive.foo=bar" for ((key, value) <- conf.getAll if key.startsWith("spark.hive.")) { hadoopConf.set(key.substring("spark.".length), value, SOURCE_SPARK_HIVE) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 3b525cd69430..1a5147e483ca 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -348,7 +348,7 @@ private[spark] object Utils def fetchFile( url: String, targetDir: File, - conf: SparkConf, + conf: ReadOnlySparkConf, hadoopConf: Configuration, timestamp: Long, useCache: Boolean, @@ -622,7 +622,7 @@ private[spark] object Utils url: String, targetDir: File, filename: String, - conf: SparkConf, + conf: ReadOnlySparkConf, hadoopConf: Configuration): File = { val targetFile = new File(targetDir, filename) val uri = new URI(url) @@ -669,7 +669,7 @@ private[spark] object Utils path: Path, targetDir: File, fs: FileSystem, - conf: SparkConf, + conf: ReadOnlySparkConf, hadoopConf: Configuration, fileOverwrite: Boolean, filename: Option[String] = None): Unit = { @@ -724,7 +724,7 @@ private[spark] object Utils * always return a single directory. The return directory is chosen randomly from the array * of directories it gets from getOrCreateLocalRootDirs. */ - def getLocalDir(conf: SparkConf): String = { + def getLocalDir(conf: ReadOnlySparkConf): String = { val localRootDirs = getOrCreateLocalRootDirs(conf) if (localRootDirs.isEmpty) { val configuredLocalDirs = getConfiguredLocalDirs(conf) @@ -735,7 +735,7 @@ private[spark] object Utils } } - private[spark] def isRunningInYarnContainer(conf: SparkConf): Boolean = { + private[spark] def isRunningInYarnContainer(conf: ReadOnlySparkConf): Boolean = { // These environment variables are set by YARN. conf.getenv("CONTAINER_ID") != null } @@ -755,7 +755,7 @@ private[spark] object Utils * So calling it multiple times with a different configuration will always return the same * set of directories. */ - private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = { + private[spark] def getOrCreateLocalRootDirs(conf: ReadOnlySparkConf): Array[String] = { if (localRootDirs == null) { this.synchronized { if (localRootDirs == null) { @@ -771,7 +771,7 @@ private[spark] object Utils * method does not create any directories on its own, it only encapsulates the * logic of locating the local directories according to deployment mode. */ - def getConfiguredLocalDirs(conf: SparkConf): Array[String] = { + def getConfiguredLocalDirs(conf: ReadOnlySparkConf): Array[String] = { if (isRunningInYarnContainer(conf)) { // If we are in yarn mode, systems can have different disk layouts so we must set it // to what Yarn on this system said was available. Note this assumes that Yarn has @@ -790,7 +790,7 @@ private[spark] object Utils } } - private def getOrCreateLocalRootDirsImpl(conf: SparkConf): Array[String] = { + private def getOrCreateLocalRootDirsImpl(conf: ReadOnlySparkConf): Array[String] = { val configuredLocalDirs = getConfiguredLocalDirs(conf) val uris = configuredLocalDirs.filter { root => // Here, we guess if the given value is a URI at its best - check if scheme is set. @@ -825,7 +825,7 @@ private[spark] object Utils } /** Get the Yarn approved local directories. */ - private def getYarnLocalDirs(conf: SparkConf): String = { + private def getYarnLocalDirs(conf: ReadOnlySparkConf): String = { val localDirs = Option(conf.getenv("LOCAL_DIRS")).getOrElse("") if (localDirs.isEmpty) { @@ -1519,7 +1519,7 @@ private[spark] object Utils private var compressedLogFileLengthCache: LoadingCache[String, java.lang.Long] = null private def getCompressedLogFileLengthCache( - sparkConf: SparkConf): LoadingCache[String, java.lang.Long] = this.synchronized { + sparkConf: ReadOnlySparkConf): LoadingCache[String, java.lang.Long] = this.synchronized { if (compressedLogFileLengthCache == null) { val compressedLogFileLengthCacheSize = sparkConf.get( UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF) @@ -1539,7 +1539,7 @@ private[spark] object Utils * It also caches the uncompressed file size to avoid repeated decompression. The cache size is * read from workerConf. */ - def getFileLength(file: File, workConf: SparkConf): Long = { + def getFileLength(file: File, workConf: ReadOnlySparkConf): Long = { if (file.getName.endsWith(".gz")) { getCompressedLogFileLengthCache(workConf).get(file.getAbsolutePath) } else { @@ -2177,7 +2177,9 @@ private[spark] object Utils /** * Convert all spark properties set in the given SparkConf to a sequence of java options. */ - def sparkJavaOpts(conf: SparkConf, filterKey: (String => Boolean) = _ => true): Seq[String] = { + def sparkJavaOpts( + conf: ReadOnlySparkConf, + filterKey: (String => Boolean) = _ => true): Seq[String] = { conf.getAll .filter { case (k, _) => filterKey(k) } .map { case (k, v) => s"-D$k=$v" } @@ -2187,7 +2189,7 @@ private[spark] object Utils /** * Maximum number of retries when binding to a port before giving up. */ - def portMaxRetries(conf: SparkConf): Int = { + def portMaxRetries(conf: ReadOnlySparkConf): Int = { val maxRetries = conf.getOption("spark.port.maxRetries").map(_.toInt) if (conf.contains(IS_TESTING)) { // Set a higher number of retries for tests... @@ -2212,7 +2214,7 @@ private[spark] object Utils def startServiceOnPort[T]( startPort: Int, startService: Int => (T, Int), - conf: SparkConf, + conf: ReadOnlySparkConf, serviceName: String = ""): (T, Int) = { startServiceOnPort(startPort, startService, portMaxRetries(conf), serviceName) } @@ -2443,7 +2445,7 @@ private[spark] object Utils val EMPTY_USER_GROUPS = Set.empty[String] // Returns the groups to which the current user belongs. - def getCurrentUserGroups(sparkConf: SparkConf, username: String): Set[String] = { + def getCurrentUserGroups(sparkConf: ReadOnlySparkConf, username: String): Set[String] = { val groupProviderClassName = sparkConf.get(USER_GROUPS_MAPPING) if (groupProviderClassName != "") { try { @@ -2606,7 +2608,7 @@ private[spark] object Utils (!isLocalMaster(conf) || conf.get(DYN_ALLOCATION_TESTING)) } - def isStreamingDynamicAllocationEnabled(conf: SparkConf): Boolean = { + def isStreamingDynamicAllocationEnabled(conf: ReadOnlySparkConf): Boolean = { val streamingDynamicAllocationEnabled = conf.get(STREAMING_DYN_ALLOCATION_ENABLED) streamingDynamicAllocationEnabled && (!isLocalMaster(conf) || conf.get(STREAMING_DYN_ALLOCATION_TESTING)) @@ -2615,7 +2617,7 @@ private[spark] object Utils /** * Return the initial number of executors for dynamic allocation. */ - def getDynamicAllocationInitialExecutors(conf: SparkConf): Int = { + def getDynamicAllocationInitialExecutors(conf: ReadOnlySparkConf): Int = { if (conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS) < conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) { logWarning(log"${MDC(CONFIG, DYN_ALLOCATION_INITIAL_EXECUTORS.key)} less than " + log"${MDC(CONFIG2, DYN_ALLOCATION_MIN_EXECUTORS.key)} is invalid, ignoring its setting, " + @@ -2682,7 +2684,7 @@ private[spark] object Utils * This is designed for a code path which logging system may be initilized before * loading SparkConf. */ - def resetStructuredLogging(sparkConf: SparkConf): Unit = { + def resetStructuredLogging(sparkConf: ReadOnlySparkConf): Unit = { if (sparkConf.get(STRUCTURED_LOGGING_ENABLED)) { Logging.enableStructuredLogging() } else { @@ -2695,7 +2697,7 @@ private[spark] object Utils * these jars through file server. In the YARN mode, it will return an empty list, since YARN * has its own mechanism to distribute jars. */ - def getUserJars(conf: SparkConf): Seq[String] = { + def getUserJars(conf: ReadOnlySparkConf): Seq[String] = { conf.get(JARS).filter(_.nonEmpty) } @@ -2704,7 +2706,7 @@ private[spark] object Utils * specified by --jars (spark.jars) or --packages, remote jars will be downloaded to local by * SparkSubmit at first. */ - def getLocalUserJarsForShell(conf: SparkConf): Seq[String] = { + def getLocalUserJarsForShell(conf: ReadOnlySparkConf): Seq[String] = { val localJars = conf.getOption("spark.repl.local.jars") localJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten } @@ -2715,7 +2717,7 @@ private[spark] object Utils * Redact the sensitive values in the given map. If a map key matches the redaction pattern then * its value is replaced with a dummy text. */ - def redact(conf: SparkConf, + def redact(conf: ReadOnlySparkConf, kvs: scala.collection.Seq[(String, String)]): scala.collection.Seq[(String, String)] = { val redactionPattern = conf.get(SECRET_REDACTION_PATTERN) redact(redactionPattern, kvs) @@ -2792,7 +2794,7 @@ private[spark] object Utils redact(redactionPattern, kvs.toArray) } - def redactCommandLineArgs(conf: SparkConf, commands: Seq[String]): Seq[String] = { + def redactCommandLineArgs(conf: ReadOnlySparkConf, commands: Seq[String]): Seq[String] = { val redactionPattern = conf.get(SECRET_REDACTION_PATTERN) commands.map { case PATTERN_FOR_COMMAND_LINE_ARG(key, value) => @@ -2913,7 +2915,7 @@ private[spark] object Utils opt.replace("{{SPARK_VERSION}}", SPARK_VERSION) } - def createSecret(conf: SparkConf): String = { + def createSecret(conf: ReadOnlySparkConf): String = { val bits = conf.get(AUTH_SECRET_BIT_LENGTH) val rnd = new SecureRandom() val secretBytes = new Array[Byte](bits / JByte.SIZE) @@ -2964,7 +2966,7 @@ private[spark] object Utils } } - def isClientMode(conf: SparkConf): Boolean = { + def isClientMode(conf: ReadOnlySparkConf): Boolean = { "client".equals(conf.get(SparkLauncher.DEPLOY_MODE, "client")) } @@ -3030,7 +3032,7 @@ private[spark] object Utils /** * Convert MEMORY_OFFHEAP_SIZE to MB Unit, return 0 if MEMORY_OFFHEAP_ENABLED is false. */ - def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf): Int = { + def executorOffHeapMemorySizeAsMb(sparkConf: ReadOnlySparkConf): Int = { val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString) checkOffHeapEnabled(sparkConf, sizeInMB).toInt } @@ -3038,7 +3040,7 @@ private[spark] object Utils /** * return 0 if MEMORY_OFFHEAP_ENABLED is false. */ - def checkOffHeapEnabled(sparkConf: SparkConf, offHeapSize: Long): Long = { + def checkOffHeapEnabled(sparkConf: ReadOnlySparkConf, offHeapSize: Long): Long = { if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) { require(offHeapSize > 0, s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true") diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index ecb46c478a20..8cfd81b5595d 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -50,8 +50,8 @@ class SparkILoop(config: ShellConfig, in0: BufferedReader, out: PrintWriter) } @transient val sc = { val _sc = spark.sparkContext - if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) { - val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null) + if (_sc.getReadOnlyConf.getBoolean("spark.ui.reverseProxy", false)) { + val proxyUrl = _sc.getReadOnlyConf.get("spark.ui.reverseProxyUrl", null) if (proxyUrl != null) { println( s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index dae3317f0892..b51c52e0e169 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -179,7 +179,7 @@ private[spark] abstract class YarnSchedulerBackend( resourceProfileId: Int): Seq[BlockManagerId] = { // TODO (SPARK-33481) This is a naive way of calculating numMergersDesired for a stage, // TODO we can use better heuristics to calculate numMergersDesired for a stage. - val maxExecutors = if (Utils.isDynamicAllocationEnabled(sc.getConf)) { + val maxExecutors = if (Utils.isDynamicAllocationEnabled(sc.getReadOnlyConf)) { maxNumExecutors } else { numExecutors diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index a1ed6e61e8be..f0ccd5320c1a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -96,7 +96,7 @@ object HiveThriftServer2 extends Logging { eventManager = new HiveThriftServer2EventManager(sc) listener = new HiveThriftServer2Listener(kvStore, sc.conf, Some(server)) sc.listenerBus.addToStatusQueue(listener) - uiTab = if (sc.getConf.get(UI_ENABLED)) { + uiTab = if (sc.getReadOnlyConf.get(UI_ENABLED)) { Some(new ThriftServerTab(new HiveThriftServer2AppStatusStore(kvStore), ThriftServerTab.getSparkUI(sc))) } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org