This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b575ef5 [SPARK-26621][CORE] Use ConfigEntry for hardcoded configs for 
shuffle categories.
1b575ef5 is described below

commit 1b575ef5d1b8e3e672b2fca5c354d6678bd78bd1
Author: liuxian <liu.xi...@zte.com.cn>
AuthorDate: Thu Jan 17 12:29:17 2019 -0600

    [SPARK-26621][CORE] Use ConfigEntry for hardcoded configs for shuffle 
categories.
    
    ## What changes were proposed in this pull request?
    
    The PR makes hardcoded `spark.shuffle` configs to use ConfigEntry and put 
them in the config package.
    
    ## How was this patch tested?
    Existing unit tests
    
    Closes #23550 from 10110346/ConfigEntry_shuffle.
    
    Authored-by: liuxian <liu.xi...@zte.com.cn>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../shuffle/sort/BypassMergeSortShuffleWriter.java |  3 +-
 .../spark/shuffle/sort/ShuffleExternalSorter.java  |  2 +-
 .../spark/shuffle/sort/UnsafeShuffleWriter.java    | 10 +--
 .../scala/org/apache/spark/MapOutputTracker.scala  | 13 ++--
 .../src/main/scala/org/apache/spark/SparkEnv.scala |  4 +-
 .../org/apache/spark/internal/config/package.scala | 90 ++++++++++++++++++++++
 .../spark/serializer/SerializerManager.scala       |  4 +-
 .../spark/shuffle/BlockStoreShuffleReader.scala    |  2 +-
 .../spark/shuffle/sort/SortShuffleWriter.scala     |  4 +-
 .../org/apache/spark/storage/BlockManager.scala    |  2 +-
 .../main/scala/org/apache/spark/util/Utils.scala   |  2 +-
 .../util/collection/ExternalAppendOnlyMap.scala    |  7 +-
 .../spark/util/collection/ExternalSorter.scala     |  6 +-
 .../apache/spark/util/collection/Spillable.scala   |  2 +-
 .../shuffle/sort/UnsafeShuffleWriterSuite.java     | 15 ++--
 .../unsafe/map/AbstractBytesToBytesMapSuite.java   |  7 +-
 .../unsafe/sort/UnsafeExternalSorterSuite.java     |  2 +-
 .../org/apache/spark/ContextCleanerSuite.scala     |  6 +-
 .../apache/spark/ExternalShuffleServiceSuite.scala |  6 +-
 .../org/apache/spark/MapOutputTrackerSuite.scala   |  7 +-
 .../test/scala/org/apache/spark/ShuffleSuite.scala | 14 ++--
 .../scala/org/apache/spark/SortShuffleSuite.scala  |  3 +-
 .../spark/security/CryptoStreamUtilsSuite.scala    |  4 +-
 .../spark/serializer/KryoSerializerSuite.scala     |  4 +-
 .../shuffle/BlockStoreShuffleReaderSuite.scala     |  5 +-
 .../apache/spark/storage/BlockManagerSuite.scala   |  6 +-
 .../collection/ExternalAppendOnlyMapSuite.scala    | 22 +++---
 .../util/collection/ExternalSorterSuite.scala      | 48 ++++++------
 .../sql/execution/UnsafeExternalRowSorter.java     |  4 +-
 .../expressions/RowBasedKeyValueBatchSuite.java    |  5 +-
 .../sql/execution/UnsafeKVExternalSorter.java      |  7 +-
 .../execution/exchange/ShuffleExchangeExec.scala   |  3 +-
 .../sql/execution/UnsafeRowSerializerSuite.scala   |  8 +-
 33 files changed, 211 insertions(+), 116 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index 997bc9e..32b4467 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -34,6 +34,7 @@ import com.google.common.io.Closeables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.spark.internal.config.package$;
 import org.apache.spark.Partitioner;
 import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
@@ -104,7 +105,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
       SparkConf conf,
       ShuffleWriteMetricsReporter writeMetrics) {
     // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no 
units are provided
-    this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", 
"32k") * 1024;
+    this.fileBufferSize = (int) (long) 
conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
     this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
     this.blockManager = blockManager;
     final ShuffleDependency<K, V, V> dep = handle.dependency();
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index dc43215..0247560 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -129,7 +129,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
         (int) 
conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());
     this.writeMetrics = writeMetrics;
     this.inMemSorter = new ShuffleInMemorySorter(
-      this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", 
true));
+      this, initialSize, (boolean) 
conf.get(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT()));
     this.peakMemoryUsedBytes = getMemoryUsage();
     this.diskWriteBufferSize =
         (int) (long) 
conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 4b0c743..3608106 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.spark.*;
 import org.apache.spark.annotation.Private;
+import org.apache.spark.internal.config.package$;
 import org.apache.spark.io.CompressionCodec;
 import org.apache.spark.io.CompressionCodec$;
 import org.apache.spark.io.NioBufferedFileInputStream;
@@ -55,7 +56,6 @@ import org.apache.spark.storage.BlockManager;
 import org.apache.spark.storage.TimeTrackingOutputStream;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.util.Utils;
-import org.apache.spark.internal.config.package$;
 
 @Private
 public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
@@ -143,8 +143,8 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
     this.taskContext = taskContext;
     this.sparkConf = sparkConf;
     this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", 
true);
-    this.initialSortBufferSize = 
sparkConf.getInt("spark.shuffle.sort.initialBufferSize",
-                                                  
DEFAULT_INITIAL_SORT_BUFFER_SIZE);
+    this.initialSortBufferSize =
+      (int) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE());
     this.inputBufferSizeInBytes =
       (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) 
* 1024;
     this.outputBufferSizeInBytes =
@@ -282,10 +282,10 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
    * @return the partition lengths in the merged file.
    */
   private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws 
IOException {
-    final boolean compressionEnabled = 
sparkConf.getBoolean("spark.shuffle.compress", true);
+    final boolean compressionEnabled = (boolean) 
sparkConf.get(package$.MODULE$.SHUFFLE_COMPRESS());
     final CompressionCodec compressionCodec = 
CompressionCodec$.MODULE$.createCodec(sparkConf);
     final boolean fastMergeEnabled =
-      sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
+      (boolean) 
sparkConf.get(package$.MODULE$.SHUFFLE_UNDAFE_FAST_MERGE_ENABLE());
     final boolean fastMergeIsSupported = !compressionEnabled ||
       
CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);
     final boolean encryptionEnabled = 
blockManager.serializerManager().encryptionEnabled();
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 1c4fa4b..a8b8e96 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -322,11 +322,10 @@ private[spark] class MapOutputTrackerMaster(
   extends MapOutputTracker(conf) {
 
   // The size at which we use Broadcast to send the map output statuses to the 
executors
-  private val minSizeForBroadcast =
-    conf.getSizeAsBytes("spark.shuffle.mapOutput.minSizeForBroadcast", 
"512k").toInt
+  private val minSizeForBroadcast = 
conf.get(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST).toInt
 
   /** Whether to compute locality preferences for reduce tasks */
-  private val shuffleLocalityEnabled = 
conf.getBoolean("spark.shuffle.reduceLocality.enabled", true)
+  private val shuffleLocalityEnabled = conf.get(SHUFFLE_REDUCE_LOCALITY_ENABLE)
 
   // Number of map and reduce tasks above which we do not assign preferred 
locations based on map
   // output sizes. We limit the size of jobs for which assign preferred 
locations as computing the
@@ -353,7 +352,7 @@ private[spark] class MapOutputTrackerMaster(
   // Thread pool used for handling map output status requests. This is a 
separate thread pool
   // to ensure we don't block the normal dispatcher threads.
   private val threadpool: ThreadPoolExecutor = {
-    val numThreads = 
conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
+    val numThreads = conf.get(SHUFFLE_MAPOUTPUT_DISPATCHER_NUM_THREADS)
     val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, 
"map-output-dispatcher")
     for (i <- 0 until numThreads) {
       pool.execute(new MessageLoop)
@@ -364,9 +363,9 @@ private[spark] class MapOutputTrackerMaster(
   // Make sure that we aren't going to exceed the max RPC message size by 
making sure
   // we use broadcast to send large map output statuses.
   if (minSizeForBroadcast > maxRpcMessageSize) {
-    val msg = s"spark.shuffle.mapOutput.minSizeForBroadcast 
($minSizeForBroadcast bytes) must " +
-      s"be <= spark.rpc.message.maxSize ($maxRpcMessageSize bytes) to prevent 
sending an rpc " +
-      "message that is too large."
+    val msg = s"${SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST.key} 
($minSizeForBroadcast bytes) " +
+      s"must be <= spark.rpc.message.maxSize ($maxRpcMessageSize bytes) to 
prevent sending an " +
+      "rpc message that is too large."
     logError(msg)
     throw new IllegalArgumentException(msg)
   }
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index ff4a043..36998d1 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.api.python.PythonWorkerFactory
 import org.apache.spark.broadcast.BroadcastManager
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config._
 import org.apache.spark.memory.{MemoryManager, UnifiedMemoryManager}
 import org.apache.spark.metrics.MetricsSystem
@@ -318,7 +318,7 @@ object SparkEnv extends Logging {
     val shortShuffleMgrNames = Map(
       "sort" -> 
classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
       "tungsten-sort" -> 
classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
-    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
+    val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
     val shuffleMgrClass =
       shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), 
shuffleMgrName)
     val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 99ce220..6488d53 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -749,6 +749,96 @@ package object config {
       .timeConf(TimeUnit.SECONDS)
       .createWithDefaultString("1h")
 
+  private[spark] val SHUFFLE_SORT_INIT_BUFFER_SIZE =
+    ConfigBuilder("spark.shuffle.sort.initialBufferSize")
+      .internal()
+      .intConf
+      .checkValue(v => v > 0, "The value should be a positive integer.")
+      .createWithDefault(4096)
+
+  private[spark] val SHUFFLE_COMPRESS =
+    ConfigBuilder("spark.shuffle.compress")
+      .doc("Whether to compress shuffle output. Compression will use " +
+        "spark.io.compression.codec.")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val SHUFFLE_SPILL_COMPRESS =
+    ConfigBuilder("spark.shuffle.spill.compress")
+      .doc("Whether to compress data spilled during shuffles. Compression will 
use " +
+        "spark.io.compression.codec.")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val SHUFFLE_SPILL_INITIAL_MEM_THRESHOLD =
+    ConfigBuilder("spark.shuffle.spill.initialMemoryThreshold")
+      .internal()
+      .doc("Initial threshold for the size of a collection before we start 
tracking its " +
+        "memory usage.")
+      .longConf
+      .createWithDefault(5 * 1024 * 1024)
+
+  private[spark] val SHUFFLE_SPILL_BATCH_SIZE =
+    ConfigBuilder("spark.shuffle.spill.batchSize")
+      .internal()
+      .doc("Size of object batches when reading/writing from serializers.")
+      .longConf
+      .createWithDefault(10000)
+
+  private[spark] val SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD =
+    ConfigBuilder("spark.shuffle.sort.bypassMergeThreshold")
+      .doc("In the sort-based shuffle manager, avoid merge-sorting data if 
there is no " +
+        "map-side aggregation and there are at most this many reduce 
partitions")
+      .intConf
+      .createWithDefault(200)
+
+  private[spark] val SHUFFLE_MANAGER =
+    ConfigBuilder("spark.shuffle.manager")
+      .stringConf
+      .createWithDefault("sort")
+
+  private[spark] val SHUFFLE_REDUCE_LOCALITY_ENABLE =
+    ConfigBuilder("spark.shuffle.reduceLocality.enabled")
+      .doc("Whether to compute locality preferences for reduce tasks")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST =
+    ConfigBuilder("spark.shuffle.mapOutput.minSizeForBroadcast")
+      .doc("The size at which we use Broadcast to send the map output statuses 
to the executors.")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("512k")
+
+  private[spark] val SHUFFLE_MAPOUTPUT_DISPATCHER_NUM_THREADS =
+    ConfigBuilder("spark.shuffle.mapOutput.dispatcher.numThreads")
+      .intConf
+      .createWithDefault(8)
+
+  private[spark] val SHUFFLE_DETECT_CORRUPT =
+    ConfigBuilder("spark.shuffle.detectCorrupt")
+      .doc("Whether to detect any corruption in fetched blocks.")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val SHUFFLE_SYNC =
+    ConfigBuilder("spark.shuffle.sync")
+      .doc("Whether to force outstanding writes to disk.")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val SHUFFLE_UNDAFE_FAST_MERGE_ENABLE =
+    ConfigBuilder("spark.shuffle.unsafe.fastMergeEnabled")
+      .doc("Whether to perform a fast spill merge.")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val SHUFFLE_SORT_USE_RADIXSORT =
+    ConfigBuilder("spark.shuffle.sort.useRadixSort")
+      .doc("Whether to use radix sort for sorting in-memory partition ids. 
Radix sort is much " +
+        "faster, but requires additional memory to be reserved memory as 
pointers are added.")
+      .booleanConf
+      .createWithDefault(true)
+
   private[spark] val SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS =
     ConfigBuilder("spark.shuffle.minNumPartitionsToHighlyCompress")
       .internal()
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala 
b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 1e233ca..3e3c387 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -66,11 +66,11 @@ private[spark] class SerializerManager(
   // Whether to compress broadcast variables that are stored
   private[this] val compressBroadcast = conf.get(config.BROADCAST_COMPRESS)
   // Whether to compress shuffle output that are stored
-  private[this] val compressShuffle = 
conf.getBoolean("spark.shuffle.compress", true)
+  private[this] val compressShuffle = conf.get(config.SHUFFLE_COMPRESS)
   // Whether to compress RDD partitions that are stored serialized
   private[this] val compressRdds = conf.get(config.RDD_COMPRESS)
   // Whether to compress shuffle output temporarily spilled to disk
-  private[this] val compressShuffleSpill = 
conf.getBoolean("spark.shuffle.spill.compress", true)
+  private[this] val compressShuffleSpill = 
conf.get(config.SHUFFLE_SPILL_COMPRESS)
 
   /* The compression codec to use. Note that the "lazy" val is necessary 
because we want to delay
    * the initialization of the compression codec until it is first used. The 
reason is that a Spark
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala 
b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
index 37f5169..daafe30 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
@@ -54,7 +54,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
       SparkEnv.get.conf.get(config.REDUCER_MAX_REQS_IN_FLIGHT),
       SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
       SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
-      SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true),
+      SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT),
       readMetrics)
 
     val serializerInstance = dep.serializer.newInstance()
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 274399b..16058de 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.shuffle.sort
 
 import org.apache.spark._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.shuffle.{BaseShuffleHandle, IndexShuffleBlockResolver, 
ShuffleWriter}
 import org.apache.spark.storage.ShuffleBlockId
@@ -108,7 +108,7 @@ private[spark] object SortShuffleWriter {
     if (dep.mapSideCombine) {
       false
     } else {
-      val bypassMergeThreshold: Int = 
conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
+      val bypassMergeThreshold: Int = 
conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
       dep.partitioner.numPartitions <= bypassMergeThreshold
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 5bfe778..8f993bf 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -942,7 +942,7 @@ private[spark] class BlockManager(
       serializerInstance: SerializerInstance,
       bufferSize: Int,
       writeMetrics: ShuffleWriteMetricsReporter): DiskBlockObjectWriter = {
-    val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
+    val syncWrites = conf.get(config.SHUFFLE_SYNC)
     new DiskBlockObjectWriter(file, serializerManager, serializerInstance, 
bufferSize,
       syncWrites, writeMetrics, blockId)
   }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7416559..1a6dc1f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -812,7 +812,7 @@ private[spark] object Utils extends Logging {
     } else {
       if (conf.getenv("MESOS_SANDBOX") != null && shuffleServiceEnabled) {
         logInfo("MESOS_SANDBOX available but not using provided Mesos sandbox 
because " +
-          "spark.shuffle.service.enabled is enabled.")
+          s"${config.SHUFFLE_SERVICE_ENABLED.key} is enabled.")
       }
       // In non-Yarn mode (or for the driver in yarn-client mode), we cannot 
trust the user
       // configuration to point to a secure directory. So create a 
subdirectory with restricted
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 19ff109..1ba3b78 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -29,7 +29,7 @@ import com.google.common.io.ByteStreams
 import org.apache.spark.{SparkEnv, TaskContext}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.ShuffleWriteMetrics
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.serializer.{DeserializationStream, Serializer, 
SerializerManager}
 import org.apache.spark.storage.{BlockId, BlockManager}
 import org.apache.spark.util.CompletionIterator
@@ -97,15 +97,14 @@ class ExternalAppendOnlyMap[K, V, C](
    * NOTE: Setting this too low can cause excessive copying when serializing, 
since some serializers
    * grow internal data structures by growing + copying every time the number 
of objects doubles.
    */
-  private val serializerBatchSize = 
sparkConf.getLong("spark.shuffle.spill.batchSize", 10000)
+  private val serializerBatchSize = 
sparkConf.get(config.SHUFFLE_SPILL_BATCH_SIZE)
 
   // Number of bytes spilled in total
   private var _diskBytesSpilled = 0L
   def diskBytesSpilled: Long = _diskBytesSpilled
 
   // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no 
units are provided
-  private val fileBufferSize =
-    sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
+  private val fileBufferSize = 
sparkConf.get(config.SHUFFLE_FILE_BUFFER_SIZE).toInt * 1024
 
   // Write metrics
   private val writeMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics()
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 46279e7..4806c13 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -27,7 +27,7 @@ import com.google.common.io.ByteStreams
 
 import org.apache.spark._
 import org.apache.spark.executor.ShuffleWriteMetrics
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.serializer._
 import org.apache.spark.storage.{BlockId, DiskBlockObjectWriter}
 
@@ -109,7 +109,7 @@ private[spark] class ExternalSorter[K, V, C](
   private val serInstance = serializer.newInstance()
 
   // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no 
units are provided
-  private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", 
"32k").toInt * 1024
+  private val fileBufferSize = conf.get(config.SHUFFLE_FILE_BUFFER_SIZE).toInt 
* 1024
 
   // Size of object batches when reading/writing from serializers.
   //
@@ -118,7 +118,7 @@ private[spark] class ExternalSorter[K, V, C](
   //
   // NOTE: Setting this too low can cause excessive copying when serializing, 
since some serializers
   // grow internal data structures by growing + copying every time the number 
of objects doubles.
-  private val serializerBatchSize = 
conf.getLong("spark.shuffle.spill.batchSize", 10000)
+  private val serializerBatchSize = conf.get(config.SHUFFLE_SPILL_BATCH_SIZE)
 
   // Data structures to store in-memory objects before we spill. Depending on 
whether we have an
   // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala 
b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
index 81457b5..bfc0fac 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
@@ -51,7 +51,7 @@ private[spark] abstract class Spillable[C](taskMemoryManager: 
TaskMemoryManager)
   // Initial threshold for the size of a collection before we start tracking 
its memory usage
   // For testing only
   private[this] val initialMemoryThreshold: Long =
-    SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", 5 
* 1024 * 1024)
+    SparkEnv.get.conf.get(SHUFFLE_SPILL_INITIAL_MEM_THRESHOLD)
 
   // Force this collection to spill when there are this many elements in memory
   // For testing only
diff --git 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index aa5082f..f34ae99 100644
--- 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -45,6 +45,7 @@ import org.apache.spark.io.CompressionCodec$;
 import org.apache.spark.io.LZ4CompressionCodec;
 import org.apache.spark.io.LZFCompressionCodec;
 import org.apache.spark.io.SnappyCompressionCodec;
+import org.apache.spark.internal.config.package$;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.memory.TestMemoryManager;
 import org.apache.spark.network.util.LimitedInputStream;
@@ -184,7 +185,7 @@ public class UnsafeShuffleWriterSuite {
         fin.getChannel().position(startOffset);
         InputStream in = new LimitedInputStream(fin, partitionSize);
         in = blockManager.serializerManager().wrapForEncryption(in);
-        if (conf.getBoolean("spark.shuffle.compress", true)) {
+        if ((boolean) conf.get(package$.MODULE$.SHUFFLE_COMPRESS())) {
           in = 
CompressionCodec$.MODULE$.createCodec(conf).compressedInputStream(in);
         }
         try (DeserializationStream recordsStream = 
serializer.newInstance().deserializeStream(in)) {
@@ -279,12 +280,12 @@ public class UnsafeShuffleWriterSuite {
       String compressionCodecName,
       boolean encrypt) throws Exception {
     if (compressionCodecName != null) {
-      conf.set("spark.shuffle.compress", "true");
+      conf.set(package$.MODULE$.SHUFFLE_COMPRESS(), true);
       conf.set("spark.io.compression.codec", compressionCodecName);
     } else {
-      conf.set("spark.shuffle.compress", "false");
+      conf.set(package$.MODULE$.SHUFFLE_COMPRESS(), false);
     }
-    
conf.set(org.apache.spark.internal.config.package$.MODULE$.IO_ENCRYPTION_ENABLED(),
 encrypt);
+    conf.set(package$.MODULE$.IO_ENCRYPTION_ENABLED(), encrypt);
 
     SerializerManager manager;
     if (encrypt) {
@@ -390,7 +391,7 @@ public class UnsafeShuffleWriterSuite {
 
   @Test
   public void mergeSpillsWithCompressionAndEncryptionSlowPath() throws 
Exception {
-    conf.set("spark.shuffle.unsafe.fastMergeEnabled", "false");
+    conf.set(package$.MODULE$.SHUFFLE_UNDAFE_FAST_MERGE_ENABLE(), false);
     testMergingSpills(false, LZ4CompressionCodec.class.getName(), true);
   }
 
@@ -430,14 +431,14 @@ public class UnsafeShuffleWriterSuite {
 
   @Test
   public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpillRadixOff() 
throws Exception {
-    conf.set("spark.shuffle.sort.useRadixSort", "false");
+    conf.set(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT(), false);
     writeEnoughRecordsToTriggerSortBufferExpansionAndSpill();
     assertEquals(2, spillFilesCreated.size());
   }
 
   @Test
   public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpillRadixOn() 
throws Exception {
-    conf.set("spark.shuffle.sort.useRadixSort", "true");
+    conf.set(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT(), true);
     writeEnoughRecordsToTriggerSortBufferExpansionAndSpill();
     assertEquals(3, spillFilesCreated.size());
   }
diff --git 
a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
 
b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index ecfebf8..e4e0d47 100644
--- 
a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ 
b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -45,6 +45,7 @@ import org.apache.spark.storage.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
 import org.apache.spark.util.Utils;
+import org.apache.spark.internal.config.package$;
 
 import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
@@ -63,7 +64,7 @@ public abstract class AbstractBytesToBytesMapSuite {
   private TaskMemoryManager taskMemoryManager;
   private SerializerManager serializerManager = new SerializerManager(
       new JavaSerializer(new SparkConf()),
-      new SparkConf().set("spark.shuffle.spill.compress", "false"));
+      new SparkConf().set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false));
   private static final long PAGE_SIZE_BYTES = 1L << 26; // 64 megabytes
 
   final LinkedList<File> spillFilesCreated = new LinkedList<>();
@@ -79,8 +80,8 @@ public abstract class AbstractBytesToBytesMapSuite {
         new SparkConf()
           .set("spark.memory.offHeap.enabled", "" + 
useOffHeapMemoryAllocator())
           .set("spark.memory.offHeap.size", "256mb")
-          .set("spark.shuffle.spill.compress", "false")
-          .set("spark.shuffle.compress", "false"));
+          .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
+          .set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
     taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
 
     tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"unsafe-test");
diff --git 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index d1b29d9..a56743f 100644
--- 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -63,7 +63,7 @@ public class UnsafeExternalSorterSuite {
   final TaskMemoryManager taskMemoryManager = new 
TaskMemoryManager(memoryManager, 0);
   final SerializerManager serializerManager = new SerializerManager(
     new JavaSerializer(conf),
-    conf.clone().set("spark.shuffle.spill.compress", "false"));
+    conf.clone().set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false));
   // Use integer comparison for comparing prefixes (which are partition ids, 
in this case)
   final PrefixComparator prefixComparator = PrefixComparators.LONG;
   // Since the key fits within the 8-byte prefix, we don't need to do any 
record comparison, so
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala 
b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 1fcc975..9e28284 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -28,7 +28,7 @@ import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.PatienceConfiguration
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
 import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.storage._
@@ -49,7 +49,7 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: 
Class[_] = classOf[So
     .set("spark.cleaner.referenceTracking.blocking", "true")
     .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
     .set("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
-    .set("spark.shuffle.manager", shuffleManager.getName)
+    .set(config.SHUFFLE_MANAGER, shuffleManager.getName)
 
   before {
     sc = new SparkContext(conf)
@@ -319,7 +319,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
       .setAppName("ContextCleanerSuite")
       .set("spark.cleaner.referenceTracking.blocking", "true")
       .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
-      .set("spark.shuffle.manager", shuffleManager.getName)
+      .set(config.SHUFFLE_MANAGER, shuffleManager.getName)
     sc = new SparkContext(conf2)
 
     val numRdds = 10
diff --git 
a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala 
b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 462d5f5..262e2a7 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -42,9 +42,9 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with 
BeforeAndAfterAll {
     val transportContext = new TransportContext(transportConf, rpcHandler)
     server = transportContext.createServer()
 
-    conf.set("spark.shuffle.manager", "sort")
-    conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
-    conf.set(config.SHUFFLE_SERVICE_PORT.key, server.getPort.toString)
+    conf.set(config.SHUFFLE_MANAGER, "sort")
+    conf.set(config.SHUFFLE_SERVICE_ENABLED, true)
+    conf.set(config.SHUFFLE_SERVICE_PORT, server.getPort)
   }
 
   override def afterAll() {
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index c088da8..adaa069 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -24,6 +24,7 @@ import org.mockito.Mockito._
 
 import org.apache.spark.LocalSparkContext._
 import org.apache.spark.broadcast.BroadcastManager
+import org.apache.spark.internal.config._
 import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv}
 import org.apache.spark.scheduler.{CompressedMapStatus, MapStatus}
 import org.apache.spark.shuffle.FetchFailedException
@@ -171,7 +172,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     val newConf = new SparkConf
     newConf.set("spark.rpc.message.maxSize", "1")
     newConf.set("spark.rpc.askTimeout", "1") // Fail fast
-    newConf.set("spark.shuffle.mapOutput.minSizeForBroadcast", "1048576")
+    newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, 1048576L)
 
     val masterTracker = newTrackerMaster(newConf)
     val rpcEnv = createRpcEnv("spark")
@@ -200,7 +201,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     val newConf = new SparkConf
     newConf.set("spark.rpc.message.maxSize", "1")
     newConf.set("spark.rpc.askTimeout", "1") // Fail fast
-    newConf.set("spark.shuffle.mapOutput.minSizeForBroadcast", 
Int.MaxValue.toString)
+    newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, Int.MaxValue.toLong)
 
     intercept[IllegalArgumentException] { newTrackerMaster(newConf) }
   }
@@ -244,7 +245,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     val newConf = new SparkConf
     newConf.set("spark.rpc.message.maxSize", "1")
     newConf.set("spark.rpc.askTimeout", "1") // Fail fast
-    newConf.set("spark.shuffle.mapOutput.minSizeForBroadcast", "10240") // 10 
KiB << 1MiB framesize
+    newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, 10240L) // 10 KiB << 
1MiB framesize
 
     // needs TorrentBroadcast so need a SparkContext
     withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { 
sc =>
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala 
b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 3203f8f..8b1084a 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.{Callable, CyclicBarrier, 
Executors, ExecutorService
 import org.scalatest.Matchers
 
 import org.apache.spark.ShuffleSuite.NonJavaSerializableClass
-import org.apache.spark.internal.config.SERIALIZER
+import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, 
ShuffledRDD, SubtractedRDD}
@@ -42,7 +42,7 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   conf.set(TEST_NO_STAGE_RETRY, true)
 
   test("groupByKey without compression") {
-    val myConf = conf.clone().set("spark.shuffle.compress", "false")
+    val myConf = conf.clone().set(config.SHUFFLE_COMPRESS, false)
     sc = new SparkContext("local", "test", myConf)
     val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
     val groups = pairs.groupByKey(4).collect()
@@ -216,7 +216,7 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
 
   test("sort with Java non serializable class - Kryo") {
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
-    val myConf = conf.clone().set(SERIALIZER, 
"org.apache.spark.serializer.KryoSerializer")
+    val myConf = conf.clone().set(config.SERIALIZER, 
"org.apache.spark.serializer.KryoSerializer")
     sc = new SparkContext("local-cluster[2,1,1024]", "test", myConf)
     val a = sc.parallelize(1 to 10, 2)
     val b = a.map { x =>
@@ -252,8 +252,8 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
       val myConf = conf.clone()
         .setAppName("test")
         .setMaster("local")
-        .set("spark.shuffle.spill.compress", shuffleSpillCompress.toString)
-        .set("spark.shuffle.compress", shuffleCompress.toString)
+        .set(config.SHUFFLE_SPILL_COMPRESS, shuffleSpillCompress)
+        .set(config.SHUFFLE_COMPRESS, shuffleCompress)
       resetSparkContext()
       sc = new SparkContext(myConf)
       val diskBlockManager = sc.env.blockManager.diskBlockManager
@@ -263,8 +263,8 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
         assert(diskBlockManager.getAllFiles().nonEmpty)
       } catch {
         case e: Exception =>
-          val errMsg = s"Failed with 
spark.shuffle.spill.compress=$shuffleSpillCompress," +
-            s" spark.shuffle.compress=$shuffleCompress"
+          val errMsg = s"Failed with 
${config.SHUFFLE_SPILL_COMPRESS.key}=$shuffleSpillCompress," +
+            s" ${config.SHUFFLE_COMPRESS.key}=$shuffleCompress"
           throw new Exception(errMsg, e)
       }
     }
diff --git a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala 
b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
index c0126e4..1aceda4 100644
--- a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
@@ -25,6 +25,7 @@ import org.apache.commons.io.FileUtils
 import org.apache.commons.io.filefilter.TrueFileFilter
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.spark.internal.config
 import org.apache.spark.rdd.ShuffledRDD
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.shuffle.sort.SortShuffleManager
@@ -42,7 +43,7 @@ class SortShuffleSuite extends ShuffleSuite with 
BeforeAndAfterAll {
     // before/after a test, it could return the same directory even if this 
property
     // is configured.
     Utils.clearLocalRootDirs()
-    conf.set("spark.shuffle.manager", "sort")
+    conf.set(config.SHUFFLE_MANAGER, "sort")
   }
 
   override def beforeEach(): Unit = {
diff --git 
a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala
index e5d1bf4..abccb8e 100644
--- a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala
@@ -75,8 +75,8 @@ class CryptoStreamUtilsSuite extends SparkFunSuite {
 
   test("serializer manager integration") {
     val conf = createConf()
-      .set("spark.shuffle.compress", "true")
-      .set("spark.shuffle.spill.compress", "true")
+      .set(SHUFFLE_COMPRESS, true)
+      .set(SHUFFLE_SPILL_COMPRESS, true)
 
     val plainStr = "hello world"
     val blockId = new TempShuffleBlockId(UUID.randomUUID())
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala 
b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 41fb405..16eec7e 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -524,8 +524,8 @@ class KryoSerializerAutoResetDisabledSuite extends 
SparkFunSuite with SharedSpar
   conf.set(SERIALIZER, classOf[KryoSerializer].getName)
   conf.set(KRYO_USER_REGISTRATORS, 
classOf[RegistratorWithoutAutoReset].getName)
   conf.set(KRYO_REFERENCE_TRACKING, true)
-  conf.set("spark.shuffle.manager", "sort")
-  conf.set("spark.shuffle.sort.bypassMergeThreshold", "200")
+  conf.set(SHUFFLE_MANAGER, "sort")
+  conf.set(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD, 200)
 
   test("sort-shuffle with bypassMergeSort (SPARK-7873)") {
     val myObject = ("Hello", "World")
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
index eb97d5a..6d2ef17 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer
 import org.mockito.Mockito.{mock, when}
 
 import org.apache.spark._
+import org.apache.spark.internal.config
 import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
 import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
 import org.apache.spark.storage.{BlockManager, BlockManagerId, ShuffleBlockId}
@@ -123,8 +124,8 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite 
with LocalSparkContext
     val serializerManager = new SerializerManager(
       serializer,
       new SparkConf()
-        .set("spark.shuffle.compress", "false")
-        .set("spark.shuffle.spill.compress", "false"))
+        .set(config.SHUFFLE_COMPRESS, false)
+        .set(config.SHUFFLE_SPILL_COMPRESS, false))
 
     val taskContext = TaskContext.empty()
     val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics()
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 233a84e..04de0e4 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -826,7 +826,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
 
   test("block compression") {
     try {
-      conf.set("spark.shuffle.compress", "true")
+      conf.set(SHUFFLE_COMPRESS, true)
       var store = makeBlockManager(20000, "exec1")
       store.putSingle(
         ShuffleBlockId(0, 0, 0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
@@ -834,7 +834,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
         "shuffle_0_0_0 was not compressed")
       stopBlockManager(store)
 
-      conf.set("spark.shuffle.compress", "false")
+      conf.set(SHUFFLE_COMPRESS, false)
       store = makeBlockManager(20000, "exec2")
       store.putSingle(
         ShuffleBlockId(0, 0, 0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
@@ -875,7 +875,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
       assert(store.memoryStore.getSize("other_block") >= 10000, "other_block 
was compressed")
       stopBlockManager(store)
     } finally {
-      System.clearProperty("spark.shuffle.compress")
+      System.clearProperty(SHUFFLE_COMPRESS.key)
       System.clearProperty(BROADCAST_COMPRESS.key)
       System.clearProperty(RDD_COMPRESS.key)
     }
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index de70153..43abb56 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -56,11 +56,11 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
     // for a bug we had with bytes written past the last object in a batch 
(SPARK-2792)
     conf.set(SERIALIZER_OBJECT_STREAM_RESET, 1)
     conf.set(SERIALIZER, "org.apache.spark.serializer.JavaSerializer")
-    conf.set("spark.shuffle.spill.compress", codec.isDefined.toString)
-    conf.set("spark.shuffle.compress", codec.isDefined.toString)
+    conf.set(SHUFFLE_SPILL_COMPRESS, codec.isDefined)
+    conf.set(SHUFFLE_COMPRESS, codec.isDefined)
     codec.foreach { c => conf.set(IO_COMPRESSION_CODEC, c) }
     // Ensure that we actually have multiple batches per spill file
-    conf.set("spark.shuffle.spill.batchSize", "10")
+    conf.set(SHUFFLE_SPILL_BATCH_SIZE, 10L)
     conf
   }
 
@@ -253,7 +253,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
   private def testSimpleSpilling(codec: Option[String] = None, encrypt: 
Boolean = false): Unit = {
     val size = 1000
     val conf = createSparkConf(loadDefaults = true, codec)  // Load defaults 
for Spark home
-    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 
4).toString)
+    conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 4)
     conf.set(IO_ENCRYPTION_ENABLED, encrypt)
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
 
@@ -297,7 +297,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
   test("ExternalAppendOnlyMap shouldn't fail when forced to spill before 
calling its iterator") {
     val size = 1000
     val conf = createSparkConf(loadDefaults = true)
-    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 
2).toString)
+    conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2)
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
     val map = createExternalMap[String]
     val consumer = createExternalMap[String]
@@ -308,7 +308,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
   test("spilling with hash collisions") {
     val size = 1000
     val conf = createSparkConf(loadDefaults = true)
-    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 
2).toString)
+    conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2)
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
     val map = createExternalMap[String]
 
@@ -359,7 +359,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
   test("spilling with many hash collisions") {
     val size = 1000
     val conf = createSparkConf(loadDefaults = true)
-    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 
2).toString)
+    conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2)
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
     val context = MemoryTestingUtils.fakeTaskContext(sc.env)
     val map =
@@ -388,7 +388,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
   test("spilling with hash collisions using the Int.MaxValue key") {
     val size = 1000
     val conf = createSparkConf(loadDefaults = true)
-    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 
2).toString)
+    conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2)
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
     val map = createExternalMap[Int]
 
@@ -407,7 +407,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
   test("spilling with null keys and values") {
     val size = 1000
     val conf = createSparkConf(loadDefaults = true)
-    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 
2).toString)
+    conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2)
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
     val map = createExternalMap[Int]
 
@@ -533,7 +533,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
   test("SPARK-22713 external aggregation updates peak execution memory") {
     val spillThreshold = 1000
     val conf = createSparkConf(loadDefaults = false)
-      .set("spark.shuffle.spill.numElementsForceSpillThreshold", 
spillThreshold.toString)
+      .set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, spillThreshold)
     sc = new SparkContext("local", "test", conf)
     // No spilling
     AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "external map without 
spilling") {
@@ -553,7 +553,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
     val conf = createSparkConf(loadDefaults = false)
       .set("spark.memory.storageFraction", "0.999")
       .set(TEST_MEMORY, 471859200L)
-      .set("spark.shuffle.sort.bypassMergeThreshold", "0")
+      .set(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD, 0)
     sc = new SparkContext("local", "test", conf)
     val N = 200000
     sc.parallelize(1 to N, 2)
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 3006409..d6c1562 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -124,7 +124,7 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   test("spilling with hash collisions") {
     val size = 1000
     val conf = createSparkConf(loadDefaults = true, kryo = false)
-    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 
2).toString)
+    conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2)
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
     val context = MemoryTestingUtils.fakeTaskContext(sc.env)
 
@@ -185,7 +185,7 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   test("spilling with many hash collisions") {
     val size = 1000
     val conf = createSparkConf(loadDefaults = true, kryo = false)
-    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 
2).toString)
+    conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2)
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
     val context = MemoryTestingUtils.fakeTaskContext(sc.env)
     val agg = new Aggregator[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
@@ -208,7 +208,7 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   test("spilling with hash collisions using the Int.MaxValue key") {
     val size = 1000
     val conf = createSparkConf(loadDefaults = true, kryo = false)
-    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 
2).toString)
+    conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2)
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
     val context = MemoryTestingUtils.fakeTaskContext(sc.env)
 
@@ -234,7 +234,7 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   test("spilling with null keys and values") {
     val size = 1000
     val conf = createSparkConf(loadDefaults = true, kryo = false)
-    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 
2).toString)
+    conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2)
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
     val context = MemoryTestingUtils.fakeTaskContext(sc.env)
 
@@ -276,10 +276,10 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
       conf.set(SERIALIZER_OBJECT_STREAM_RESET, 1)
       conf.set(SERIALIZER, classOf[JavaSerializer].getName)
     }
-    conf.set("spark.shuffle.sort.bypassMergeThreshold", "0")
+    conf.set(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD, 0)
     // Ensure that we actually have multiple batches per spill file
-    conf.set("spark.shuffle.spill.batchSize", "10")
-    conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
+    conf.set(SHUFFLE_SPILL_BATCH_SIZE, 10L)
+    conf.set(SHUFFLE_SPILL_INITIAL_MEM_THRESHOLD, 512L)
     conf
   }
 
@@ -302,7 +302,7 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
    * =========================================== */
 
   private def emptyDataStream(conf: SparkConf) {
-    conf.set("spark.shuffle.manager", "sort")
+    conf.set(SHUFFLE_MANAGER, "sort")
     sc = new SparkContext("local", "test", conf)
     val context = MemoryTestingUtils.fakeTaskContext(sc.env)
 
@@ -335,7 +335,7 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   private def fewElementsPerPartition(conf: SparkConf) {
-    conf.set("spark.shuffle.manager", "sort")
+    conf.set(SHUFFLE_MANAGER, "sort")
     sc = new SparkContext("local", "test", conf)
     val context = MemoryTestingUtils.fakeTaskContext(sc.env)
 
@@ -377,8 +377,8 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
 
   private def emptyPartitionsWithSpilling(conf: SparkConf) {
     val size = 1000
-    conf.set("spark.shuffle.manager", "sort")
-    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 
2).toString)
+    conf.set(SHUFFLE_MANAGER, "sort")
+    conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2)
     sc = new SparkContext("local", "test", conf)
     val context = MemoryTestingUtils.fakeTaskContext(sc.env)
 
@@ -402,8 +402,8 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
 
   private def testSpillingInLocalCluster(conf: SparkConf, numReduceTasks: Int) 
{
     val size = 5000
-    conf.set("spark.shuffle.manager", "sort")
-    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 
4).toString)
+    conf.set(SHUFFLE_MANAGER, "sort")
+    conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 4)
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
 
     assertSpilled(sc, "reduceByKey") {
@@ -462,8 +462,8 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   private def cleanupIntermediateFilesInSorter(withFailures: Boolean): Unit = {
     val size = 1200
     val conf = createSparkConf(loadDefaults = false, kryo = false)
-    conf.set("spark.shuffle.manager", "sort")
-    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 
4).toString)
+    conf.set(SHUFFLE_MANAGER, "sort")
+    conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 4)
     sc = new SparkContext("local", "test", conf)
     val diskBlockManager = sc.env.blockManager.diskBlockManager
     val ord = implicitly[Ordering[Int]]
@@ -491,8 +491,8 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   private def cleanupIntermediateFilesInShuffle(withFailures: Boolean): Unit = 
{
     val size = 1200
     val conf = createSparkConf(loadDefaults = false, kryo = false)
-    conf.set("spark.shuffle.manager", "sort")
-    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 
4).toString)
+    conf.set(SHUFFLE_MANAGER, "sort")
+    conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 4)
     sc = new SparkContext("local", "test", conf)
     val diskBlockManager = sc.env.blockManager.diskBlockManager
     val data = sc.parallelize(0 until size, 2).map { i =>
@@ -527,9 +527,9 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
       withSpilling: Boolean) {
     val size = 1000
     if (withSpilling) {
-      conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 
2).toString)
+      conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2)
     }
-    conf.set("spark.shuffle.manager", "sort")
+    conf.set(SHUFFLE_MANAGER, "sort")
     sc = new SparkContext("local", "test", conf)
     val agg =
       if (withPartialAgg) {
@@ -561,8 +561,8 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   private def sortWithoutBreakingSortingContracts(conf: SparkConf) {
     val size = 100000
     val conf = createSparkConf(loadDefaults = true, kryo = false)
-    conf.set("spark.shuffle.manager", "sort")
-    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 
2).toString)
+    conf.set(SHUFFLE_MANAGER, "sort")
+    conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2)
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
 
     // Using wrongOrdering to show integer overflow introduced exception.
@@ -619,8 +619,8 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
   test("sorting updates peak execution memory") {
     val spillThreshold = 1000
     val conf = createSparkConf(loadDefaults = false, kryo = false)
-      .set("spark.shuffle.manager", "sort")
-      .set("spark.shuffle.spill.numElementsForceSpillThreshold", 
spillThreshold.toString)
+      .set(SHUFFLE_MANAGER, "sort")
+      .set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, spillThreshold)
     sc = new SparkContext("local", "test", conf)
     // Avoid aggregating here to make sure we're not also using 
ExternalAppendOnlyMap
     // No spilling
@@ -641,7 +641,7 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
     val conf = createSparkConf(loadDefaults = false, kryo = false)
       .set("spark.memory.storageFraction", "0.999")
       .set(TEST_MEMORY, 471859200L)
-      .set("spark.shuffle.sort.bypassMergeThreshold", "0")
+      .set(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD, 0)
     sc = new SparkContext("local", "test", conf)
     val N = 200000
     val p = new org.apache.spark.HashPartitioner(2)
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 5395e40..d6edddf 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -40,7 +40,6 @@ import 
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterIterator;
 
 public final class UnsafeExternalRowSorter {
 
-  static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096;
   /**
    * If positive, forces records to be spilled to disk at the given frequency 
(measured in numbers
    * of records). This is only intended to be used in tests.
@@ -112,8 +111,7 @@ public final class UnsafeExternalRowSorter {
       taskContext,
       recordComparatorSupplier,
       prefixComparator,
-      sparkEnv.conf().getInt("spark.shuffle.sort.initialBufferSize",
-                             DEFAULT_INITIAL_SORT_BUFFER_SIZE),
+      (int) 
sparkEnv.conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
       pageSizeBytes,
       (int) SparkEnv.get().conf().get(
         package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD()),
diff --git 
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java
 
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java
index 8da7788..16452b4 100644
--- 
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java
+++ 
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java
@@ -29,6 +29,7 @@ import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
 import org.apache.spark.unsafe.types.UTF8String;
+import org.apache.spark.internal.config.package$;
 
 import java.util.Random;
 
@@ -105,8 +106,8 @@ public class RowBasedKeyValueBatchSuite {
   public void setup() {
     memoryManager = new TestMemoryManager(new SparkConf()
             .set("spark.memory.offHeap.enabled", "false")
-            .set("spark.shuffle.spill.compress", "false")
-            .set("spark.shuffle.compress", "false"));
+            .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
+            .set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
     taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
   }
 
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index 9eb0343..5b12633 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.spark.SparkEnv;
 import org.apache.spark.TaskContext;
+import org.apache.spark.internal.config.package$;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.serializer.SerializerManager;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
@@ -93,8 +94,7 @@ public final class UnsafeKVExternalSorter {
         taskContext,
         comparatorSupplier,
         prefixComparator,
-        SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize",
-                                     
UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE),
+        (int) 
SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
         pageSizeBytes,
         numElementsForSpillThreshold,
         canUseRadixSort);
@@ -160,8 +160,7 @@ public final class UnsafeKVExternalSorter {
         taskContext,
         comparatorSupplier,
         prefixComparator,
-        SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize",
-                                     
UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE),
+        (int) 
SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
         pageSizeBytes,
         numElementsForSpillThreshold,
         inMemSorter);
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index da7b0c6..16398e3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -21,6 +21,7 @@ import java.util.Random
 import java.util.function.Supplier
 
 import org.apache.spark._
+import org.apache.spark.internal.config
 import org.apache.spark.rdd.RDD
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.{ShuffleWriteMetricsReporter, 
ShuffleWriteProcessor}
@@ -172,7 +173,7 @@ object ShuffleExchangeExec {
     val conf = SparkEnv.get.conf
     val shuffleManager = SparkEnv.get.shuffleManager
     val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager]
-    val bypassMergeThreshold = 
conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
+    val bypassMergeThreshold = 
conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
     val numParts = partitioner.numPartitions
     if (sortBasedShuffleOn) {
       if (numParts <= bypassMergeThreshold) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index 963e425..1640a96 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -21,6 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
File}
 import java.util.Properties
 
 import org.apache.spark._
+import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Tests.TEST_MEMORY
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.rdd.RDD
@@ -98,9 +99,10 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with 
LocalSparkSession {
 
   test("SPARK-10466: external sorter spilling with unsafe row serializer") {
     val conf = new SparkConf()
-      .set("spark.shuffle.spill.initialMemoryThreshold", "1")
-      .set("spark.shuffle.sort.bypassMergeThreshold", "0")
+      .set(SHUFFLE_SPILL_INITIAL_MEM_THRESHOLD, 1L)
+      .set(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD, 0)
       .set(TEST_MEMORY, 80000L)
+
     spark = 
SparkSession.builder().master("local").appName("test").config(conf).getOrCreate()
     val outputFile = File.createTempFile("test-unsafe-row-serializer-spill", 
"")
     outputFile.deleteOnExit()
@@ -127,7 +129,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with 
LocalSparkSession {
   }
 
   test("SPARK-10403: unsafe row serializer with SortShuffleManager") {
-    val conf = new SparkConf().set("spark.shuffle.manager", "sort")
+    val conf = new SparkConf().set(SHUFFLE_MANAGER, "sort")
     spark = 
SparkSession.builder().master("local").appName("test").config(conf).getOrCreate()
     val row = Row("Hello", 123)
     val unsafeRow = toUnsafeRow(row, Array(StringType, IntegerType))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to