This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1b575ef5 [SPARK-26621][CORE] Use ConfigEntry for hardcoded configs for shuffle categories. 1b575ef5 is described below commit 1b575ef5d1b8e3e672b2fca5c354d6678bd78bd1 Author: liuxian <liu.xi...@zte.com.cn> AuthorDate: Thu Jan 17 12:29:17 2019 -0600 [SPARK-26621][CORE] Use ConfigEntry for hardcoded configs for shuffle categories. ## What changes were proposed in this pull request? The PR makes hardcoded `spark.shuffle` configs to use ConfigEntry and put them in the config package. ## How was this patch tested? Existing unit tests Closes #23550 from 10110346/ConfigEntry_shuffle. Authored-by: liuxian <liu.xi...@zte.com.cn> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../shuffle/sort/BypassMergeSortShuffleWriter.java | 3 +- .../spark/shuffle/sort/ShuffleExternalSorter.java | 2 +- .../spark/shuffle/sort/UnsafeShuffleWriter.java | 10 +-- .../scala/org/apache/spark/MapOutputTracker.scala | 13 ++-- .../src/main/scala/org/apache/spark/SparkEnv.scala | 4 +- .../org/apache/spark/internal/config/package.scala | 90 ++++++++++++++++++++++ .../spark/serializer/SerializerManager.scala | 4 +- .../spark/shuffle/BlockStoreShuffleReader.scala | 2 +- .../spark/shuffle/sort/SortShuffleWriter.scala | 4 +- .../org/apache/spark/storage/BlockManager.scala | 2 +- .../main/scala/org/apache/spark/util/Utils.scala | 2 +- .../util/collection/ExternalAppendOnlyMap.scala | 7 +- .../spark/util/collection/ExternalSorter.scala | 6 +- .../apache/spark/util/collection/Spillable.scala | 2 +- .../shuffle/sort/UnsafeShuffleWriterSuite.java | 15 ++-- .../unsafe/map/AbstractBytesToBytesMapSuite.java | 7 +- .../unsafe/sort/UnsafeExternalSorterSuite.java | 2 +- .../org/apache/spark/ContextCleanerSuite.scala | 6 +- .../apache/spark/ExternalShuffleServiceSuite.scala | 6 +- .../org/apache/spark/MapOutputTrackerSuite.scala | 7 +- .../test/scala/org/apache/spark/ShuffleSuite.scala | 14 ++-- .../scala/org/apache/spark/SortShuffleSuite.scala | 3 +- .../spark/security/CryptoStreamUtilsSuite.scala | 4 +- .../spark/serializer/KryoSerializerSuite.scala | 4 +- .../shuffle/BlockStoreShuffleReaderSuite.scala | 5 +- .../apache/spark/storage/BlockManagerSuite.scala | 6 +- .../collection/ExternalAppendOnlyMapSuite.scala | 22 +++--- .../util/collection/ExternalSorterSuite.scala | 48 ++++++------ .../sql/execution/UnsafeExternalRowSorter.java | 4 +- .../expressions/RowBasedKeyValueBatchSuite.java | 5 +- .../sql/execution/UnsafeKVExternalSorter.java | 7 +- .../execution/exchange/ShuffleExchangeExec.scala | 3 +- .../sql/execution/UnsafeRowSerializerSuite.scala | 8 +- 33 files changed, 211 insertions(+), 116 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 997bc9e..32b4467 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -34,6 +34,7 @@ import com.google.common.io.Closeables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.spark.internal.config.package$; import org.apache.spark.Partitioner; import org.apache.spark.ShuffleDependency; import org.apache.spark.SparkConf; @@ -104,7 +105,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> { SparkConf conf, ShuffleWriteMetricsReporter writeMetrics) { // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided - this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; + this.fileBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024; this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true); this.blockManager = blockManager; final ShuffleDependency<K, V, V> dep = handle.dependency(); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index dc43215..0247560 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -129,7 +129,7 @@ final class ShuffleExternalSorter extends MemoryConsumer { (int) conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD()); this.writeMetrics = writeMetrics; this.inMemSorter = new ShuffleInMemorySorter( - this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true)); + this, initialSize, (boolean) conf.get(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT())); this.peakMemoryUsedBytes = getMemoryUsage(); this.diskWriteBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE()); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 4b0c743..3608106 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import org.apache.spark.*; import org.apache.spark.annotation.Private; +import org.apache.spark.internal.config.package$; import org.apache.spark.io.CompressionCodec; import org.apache.spark.io.CompressionCodec$; import org.apache.spark.io.NioBufferedFileInputStream; @@ -55,7 +56,6 @@ import org.apache.spark.storage.BlockManager; import org.apache.spark.storage.TimeTrackingOutputStream; import org.apache.spark.unsafe.Platform; import org.apache.spark.util.Utils; -import org.apache.spark.internal.config.package$; @Private public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { @@ -143,8 +143,8 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { this.taskContext = taskContext; this.sparkConf = sparkConf; this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true); - this.initialSortBufferSize = sparkConf.getInt("spark.shuffle.sort.initialBufferSize", - DEFAULT_INITIAL_SORT_BUFFER_SIZE); + this.initialSortBufferSize = + (int) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()); this.inputBufferSizeInBytes = (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024; this.outputBufferSizeInBytes = @@ -282,10 +282,10 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { * @return the partition lengths in the merged file. */ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException { - final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true); + final boolean compressionEnabled = (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_COMPRESS()); final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf); final boolean fastMergeEnabled = - sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true); + (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_UNDAFE_FAST_MERGE_ENABLE()); final boolean fastMergeIsSupported = !compressionEnabled || CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec); final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled(); diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 1c4fa4b..a8b8e96 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -322,11 +322,10 @@ private[spark] class MapOutputTrackerMaster( extends MapOutputTracker(conf) { // The size at which we use Broadcast to send the map output statuses to the executors - private val minSizeForBroadcast = - conf.getSizeAsBytes("spark.shuffle.mapOutput.minSizeForBroadcast", "512k").toInt + private val minSizeForBroadcast = conf.get(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST).toInt /** Whether to compute locality preferences for reduce tasks */ - private val shuffleLocalityEnabled = conf.getBoolean("spark.shuffle.reduceLocality.enabled", true) + private val shuffleLocalityEnabled = conf.get(SHUFFLE_REDUCE_LOCALITY_ENABLE) // Number of map and reduce tasks above which we do not assign preferred locations based on map // output sizes. We limit the size of jobs for which assign preferred locations as computing the @@ -353,7 +352,7 @@ private[spark] class MapOutputTrackerMaster( // Thread pool used for handling map output status requests. This is a separate thread pool // to ensure we don't block the normal dispatcher threads. private val threadpool: ThreadPoolExecutor = { - val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8) + val numThreads = conf.get(SHUFFLE_MAPOUTPUT_DISPATCHER_NUM_THREADS) val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher") for (i <- 0 until numThreads) { pool.execute(new MessageLoop) @@ -364,9 +363,9 @@ private[spark] class MapOutputTrackerMaster( // Make sure that we aren't going to exceed the max RPC message size by making sure // we use broadcast to send large map output statuses. if (minSizeForBroadcast > maxRpcMessageSize) { - val msg = s"spark.shuffle.mapOutput.minSizeForBroadcast ($minSizeForBroadcast bytes) must " + - s"be <= spark.rpc.message.maxSize ($maxRpcMessageSize bytes) to prevent sending an rpc " + - "message that is too large." + val msg = s"${SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST.key} ($minSizeForBroadcast bytes) " + + s"must be <= spark.rpc.message.maxSize ($maxRpcMessageSize bytes) to prevent sending an " + + "rpc message that is too large." logError(msg) throw new IllegalArgumentException(msg) } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index ff4a043..36998d1 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.python.PythonWorkerFactory import org.apache.spark.broadcast.BroadcastManager -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ import org.apache.spark.memory.{MemoryManager, UnifiedMemoryManager} import org.apache.spark.metrics.MetricsSystem @@ -318,7 +318,7 @@ object SparkEnv extends Logging { val shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) - val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") + val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER) val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 99ce220..6488d53 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -749,6 +749,96 @@ package object config { .timeConf(TimeUnit.SECONDS) .createWithDefaultString("1h") + private[spark] val SHUFFLE_SORT_INIT_BUFFER_SIZE = + ConfigBuilder("spark.shuffle.sort.initialBufferSize") + .internal() + .intConf + .checkValue(v => v > 0, "The value should be a positive integer.") + .createWithDefault(4096) + + private[spark] val SHUFFLE_COMPRESS = + ConfigBuilder("spark.shuffle.compress") + .doc("Whether to compress shuffle output. Compression will use " + + "spark.io.compression.codec.") + .booleanConf + .createWithDefault(true) + + private[spark] val SHUFFLE_SPILL_COMPRESS = + ConfigBuilder("spark.shuffle.spill.compress") + .doc("Whether to compress data spilled during shuffles. Compression will use " + + "spark.io.compression.codec.") + .booleanConf + .createWithDefault(true) + + private[spark] val SHUFFLE_SPILL_INITIAL_MEM_THRESHOLD = + ConfigBuilder("spark.shuffle.spill.initialMemoryThreshold") + .internal() + .doc("Initial threshold for the size of a collection before we start tracking its " + + "memory usage.") + .longConf + .createWithDefault(5 * 1024 * 1024) + + private[spark] val SHUFFLE_SPILL_BATCH_SIZE = + ConfigBuilder("spark.shuffle.spill.batchSize") + .internal() + .doc("Size of object batches when reading/writing from serializers.") + .longConf + .createWithDefault(10000) + + private[spark] val SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD = + ConfigBuilder("spark.shuffle.sort.bypassMergeThreshold") + .doc("In the sort-based shuffle manager, avoid merge-sorting data if there is no " + + "map-side aggregation and there are at most this many reduce partitions") + .intConf + .createWithDefault(200) + + private[spark] val SHUFFLE_MANAGER = + ConfigBuilder("spark.shuffle.manager") + .stringConf + .createWithDefault("sort") + + private[spark] val SHUFFLE_REDUCE_LOCALITY_ENABLE = + ConfigBuilder("spark.shuffle.reduceLocality.enabled") + .doc("Whether to compute locality preferences for reduce tasks") + .booleanConf + .createWithDefault(true) + + private[spark] val SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST = + ConfigBuilder("spark.shuffle.mapOutput.minSizeForBroadcast") + .doc("The size at which we use Broadcast to send the map output statuses to the executors.") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("512k") + + private[spark] val SHUFFLE_MAPOUTPUT_DISPATCHER_NUM_THREADS = + ConfigBuilder("spark.shuffle.mapOutput.dispatcher.numThreads") + .intConf + .createWithDefault(8) + + private[spark] val SHUFFLE_DETECT_CORRUPT = + ConfigBuilder("spark.shuffle.detectCorrupt") + .doc("Whether to detect any corruption in fetched blocks.") + .booleanConf + .createWithDefault(true) + + private[spark] val SHUFFLE_SYNC = + ConfigBuilder("spark.shuffle.sync") + .doc("Whether to force outstanding writes to disk.") + .booleanConf + .createWithDefault(false) + + private[spark] val SHUFFLE_UNDAFE_FAST_MERGE_ENABLE = + ConfigBuilder("spark.shuffle.unsafe.fastMergeEnabled") + .doc("Whether to perform a fast spill merge.") + .booleanConf + .createWithDefault(true) + + private[spark] val SHUFFLE_SORT_USE_RADIXSORT = + ConfigBuilder("spark.shuffle.sort.useRadixSort") + .doc("Whether to use radix sort for sorting in-memory partition ids. Radix sort is much " + + "faster, but requires additional memory to be reserved memory as pointers are added.") + .booleanConf + .createWithDefault(true) + private[spark] val SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS = ConfigBuilder("spark.shuffle.minNumPartitionsToHighlyCompress") .internal() diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index 1e233ca..3e3c387 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -66,11 +66,11 @@ private[spark] class SerializerManager( // Whether to compress broadcast variables that are stored private[this] val compressBroadcast = conf.get(config.BROADCAST_COMPRESS) // Whether to compress shuffle output that are stored - private[this] val compressShuffle = conf.getBoolean("spark.shuffle.compress", true) + private[this] val compressShuffle = conf.get(config.SHUFFLE_COMPRESS) // Whether to compress RDD partitions that are stored serialized private[this] val compressRdds = conf.get(config.RDD_COMPRESS) // Whether to compress shuffle output temporarily spilled to disk - private[this] val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true) + private[this] val compressShuffleSpill = conf.get(config.SHUFFLE_SPILL_COMPRESS) /* The compression codec to use. Note that the "lazy" val is necessary because we want to delay * the initialization of the compression codec until it is first used. The reason is that a Spark diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 37f5169..daafe30 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -54,7 +54,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( SparkEnv.get.conf.get(config.REDUCER_MAX_REQS_IN_FLIGHT), SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS), SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM), - SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true), + SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT), readMetrics) val serializerInstance = dep.serializer.newInstance() diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 274399b..16058de 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -18,7 +18,7 @@ package org.apache.spark.shuffle.sort import org.apache.spark._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.{BaseShuffleHandle, IndexShuffleBlockResolver, ShuffleWriter} import org.apache.spark.storage.ShuffleBlockId @@ -108,7 +108,7 @@ private[spark] object SortShuffleWriter { if (dep.mapSideCombine) { false } else { - val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) + val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD) dep.partitioner.numPartitions <= bypassMergeThreshold } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 5bfe778..8f993bf 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -942,7 +942,7 @@ private[spark] class BlockManager( serializerInstance: SerializerInstance, bufferSize: Int, writeMetrics: ShuffleWriteMetricsReporter): DiskBlockObjectWriter = { - val syncWrites = conf.getBoolean("spark.shuffle.sync", false) + val syncWrites = conf.get(config.SHUFFLE_SYNC) new DiskBlockObjectWriter(file, serializerManager, serializerInstance, bufferSize, syncWrites, writeMetrics, blockId) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 7416559..1a6dc1f 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -812,7 +812,7 @@ private[spark] object Utils extends Logging { } else { if (conf.getenv("MESOS_SANDBOX") != null && shuffleServiceEnabled) { logInfo("MESOS_SANDBOX available but not using provided Mesos sandbox because " + - "spark.shuffle.service.enabled is enabled.") + s"${config.SHUFFLE_SERVICE_ENABLED.key} is enabled.") } // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user // configuration to point to a secure directory. So create a subdirectory with restricted diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 19ff109..1ba3b78 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -29,7 +29,7 @@ import com.google.common.io.ByteStreams import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.ShuffleWriteMetrics -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.serializer.{DeserializationStream, Serializer, SerializerManager} import org.apache.spark.storage.{BlockId, BlockManager} import org.apache.spark.util.CompletionIterator @@ -97,15 +97,14 @@ class ExternalAppendOnlyMap[K, V, C]( * NOTE: Setting this too low can cause excessive copying when serializing, since some serializers * grow internal data structures by growing + copying every time the number of objects doubles. */ - private val serializerBatchSize = sparkConf.getLong("spark.shuffle.spill.batchSize", 10000) + private val serializerBatchSize = sparkConf.get(config.SHUFFLE_SPILL_BATCH_SIZE) // Number of bytes spilled in total private var _diskBytesSpilled = 0L def diskBytesSpilled: Long = _diskBytesSpilled // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided - private val fileBufferSize = - sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024 + private val fileBufferSize = sparkConf.get(config.SHUFFLE_FILE_BUFFER_SIZE).toInt * 1024 // Write metrics private val writeMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics() diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 46279e7..4806c13 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -27,7 +27,7 @@ import com.google.common.io.ByteStreams import org.apache.spark._ import org.apache.spark.executor.ShuffleWriteMetrics -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.serializer._ import org.apache.spark.storage.{BlockId, DiskBlockObjectWriter} @@ -109,7 +109,7 @@ private[spark] class ExternalSorter[K, V, C]( private val serInstance = serializer.newInstance() // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided - private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024 + private val fileBufferSize = conf.get(config.SHUFFLE_FILE_BUFFER_SIZE).toInt * 1024 // Size of object batches when reading/writing from serializers. // @@ -118,7 +118,7 @@ private[spark] class ExternalSorter[K, V, C]( // // NOTE: Setting this too low can cause excessive copying when serializing, since some serializers // grow internal data structures by growing + copying every time the number of objects doubles. - private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000) + private val serializerBatchSize = conf.get(config.SHUFFLE_SPILL_BATCH_SIZE) // Data structures to store in-memory objects before we spill. Depending on whether we have an // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index 81457b5..bfc0fac 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -51,7 +51,7 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager) // Initial threshold for the size of a collection before we start tracking its memory usage // For testing only private[this] val initialMemoryThreshold: Long = - SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", 5 * 1024 * 1024) + SparkEnv.get.conf.get(SHUFFLE_SPILL_INITIAL_MEM_THRESHOLD) // Force this collection to spill when there are this many elements in memory // For testing only diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index aa5082f..f34ae99 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -45,6 +45,7 @@ import org.apache.spark.io.CompressionCodec$; import org.apache.spark.io.LZ4CompressionCodec; import org.apache.spark.io.LZFCompressionCodec; import org.apache.spark.io.SnappyCompressionCodec; +import org.apache.spark.internal.config.package$; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.memory.TestMemoryManager; import org.apache.spark.network.util.LimitedInputStream; @@ -184,7 +185,7 @@ public class UnsafeShuffleWriterSuite { fin.getChannel().position(startOffset); InputStream in = new LimitedInputStream(fin, partitionSize); in = blockManager.serializerManager().wrapForEncryption(in); - if (conf.getBoolean("spark.shuffle.compress", true)) { + if ((boolean) conf.get(package$.MODULE$.SHUFFLE_COMPRESS())) { in = CompressionCodec$.MODULE$.createCodec(conf).compressedInputStream(in); } try (DeserializationStream recordsStream = serializer.newInstance().deserializeStream(in)) { @@ -279,12 +280,12 @@ public class UnsafeShuffleWriterSuite { String compressionCodecName, boolean encrypt) throws Exception { if (compressionCodecName != null) { - conf.set("spark.shuffle.compress", "true"); + conf.set(package$.MODULE$.SHUFFLE_COMPRESS(), true); conf.set("spark.io.compression.codec", compressionCodecName); } else { - conf.set("spark.shuffle.compress", "false"); + conf.set(package$.MODULE$.SHUFFLE_COMPRESS(), false); } - conf.set(org.apache.spark.internal.config.package$.MODULE$.IO_ENCRYPTION_ENABLED(), encrypt); + conf.set(package$.MODULE$.IO_ENCRYPTION_ENABLED(), encrypt); SerializerManager manager; if (encrypt) { @@ -390,7 +391,7 @@ public class UnsafeShuffleWriterSuite { @Test public void mergeSpillsWithCompressionAndEncryptionSlowPath() throws Exception { - conf.set("spark.shuffle.unsafe.fastMergeEnabled", "false"); + conf.set(package$.MODULE$.SHUFFLE_UNDAFE_FAST_MERGE_ENABLE(), false); testMergingSpills(false, LZ4CompressionCodec.class.getName(), true); } @@ -430,14 +431,14 @@ public class UnsafeShuffleWriterSuite { @Test public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpillRadixOff() throws Exception { - conf.set("spark.shuffle.sort.useRadixSort", "false"); + conf.set(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT(), false); writeEnoughRecordsToTriggerSortBufferExpansionAndSpill(); assertEquals(2, spillFilesCreated.size()); } @Test public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpillRadixOn() throws Exception { - conf.set("spark.shuffle.sort.useRadixSort", "true"); + conf.set(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT(), true); writeEnoughRecordsToTriggerSortBufferExpansionAndSpill(); assertEquals(3, spillFilesCreated.size()); } diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java index ecfebf8..e4e0d47 100644 --- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java +++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java @@ -45,6 +45,7 @@ import org.apache.spark.storage.*; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.util.Utils; +import org.apache.spark.internal.config.package$; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; @@ -63,7 +64,7 @@ public abstract class AbstractBytesToBytesMapSuite { private TaskMemoryManager taskMemoryManager; private SerializerManager serializerManager = new SerializerManager( new JavaSerializer(new SparkConf()), - new SparkConf().set("spark.shuffle.spill.compress", "false")); + new SparkConf().set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)); private static final long PAGE_SIZE_BYTES = 1L << 26; // 64 megabytes final LinkedList<File> spillFilesCreated = new LinkedList<>(); @@ -79,8 +80,8 @@ public abstract class AbstractBytesToBytesMapSuite { new SparkConf() .set("spark.memory.offHeap.enabled", "" + useOffHeapMemoryAllocator()) .set("spark.memory.offHeap.size", "256mb") - .set("spark.shuffle.spill.compress", "false") - .set("spark.shuffle.compress", "false")); + .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false) + .set(package$.MODULE$.SHUFFLE_COMPRESS(), false)); taskMemoryManager = new TaskMemoryManager(memoryManager, 0); tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test"); diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index d1b29d9..a56743f 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -63,7 +63,7 @@ public class UnsafeExternalSorterSuite { final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0); final SerializerManager serializerManager = new SerializerManager( new JavaSerializer(conf), - conf.clone().set("spark.shuffle.spill.compress", "false")); + conf.clone().set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)); // Use integer comparison for comparing prefixes (which are partition ids, in this case) final PrefixComparator prefixComparator = PrefixComparators.LONG; // Since the key fits within the 8-byte prefix, we don't need to do any record comparison, so diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 1fcc975..9e28284 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.PatienceConfiguration import org.scalatest.time.SpanSugar._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData} import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage._ @@ -49,7 +49,7 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[So .set("spark.cleaner.referenceTracking.blocking", "true") .set("spark.cleaner.referenceTracking.blocking.shuffle", "true") .set("spark.cleaner.referenceTracking.cleanCheckpoints", "true") - .set("spark.shuffle.manager", shuffleManager.getName) + .set(config.SHUFFLE_MANAGER, shuffleManager.getName) before { sc = new SparkContext(conf) @@ -319,7 +319,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { .setAppName("ContextCleanerSuite") .set("spark.cleaner.referenceTracking.blocking", "true") .set("spark.cleaner.referenceTracking.blocking.shuffle", "true") - .set("spark.shuffle.manager", shuffleManager.getName) + .set(config.SHUFFLE_MANAGER, shuffleManager.getName) sc = new SparkContext(conf2) val numRdds = 10 diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index 462d5f5..262e2a7 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -42,9 +42,9 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll { val transportContext = new TransportContext(transportConf, rpcHandler) server = transportContext.createServer() - conf.set("spark.shuffle.manager", "sort") - conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true") - conf.set(config.SHUFFLE_SERVICE_PORT.key, server.getPort.toString) + conf.set(config.SHUFFLE_MANAGER, "sort") + conf.set(config.SHUFFLE_SERVICE_ENABLED, true) + conf.set(config.SHUFFLE_SERVICE_PORT, server.getPort) } override def afterAll() { diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index c088da8..adaa069 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -24,6 +24,7 @@ import org.mockito.Mockito._ import org.apache.spark.LocalSparkContext._ import org.apache.spark.broadcast.BroadcastManager +import org.apache.spark.internal.config._ import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv} import org.apache.spark.scheduler.{CompressedMapStatus, MapStatus} import org.apache.spark.shuffle.FetchFailedException @@ -171,7 +172,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { val newConf = new SparkConf newConf.set("spark.rpc.message.maxSize", "1") newConf.set("spark.rpc.askTimeout", "1") // Fail fast - newConf.set("spark.shuffle.mapOutput.minSizeForBroadcast", "1048576") + newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, 1048576L) val masterTracker = newTrackerMaster(newConf) val rpcEnv = createRpcEnv("spark") @@ -200,7 +201,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { val newConf = new SparkConf newConf.set("spark.rpc.message.maxSize", "1") newConf.set("spark.rpc.askTimeout", "1") // Fail fast - newConf.set("spark.shuffle.mapOutput.minSizeForBroadcast", Int.MaxValue.toString) + newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, Int.MaxValue.toLong) intercept[IllegalArgumentException] { newTrackerMaster(newConf) } } @@ -244,7 +245,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { val newConf = new SparkConf newConf.set("spark.rpc.message.maxSize", "1") newConf.set("spark.rpc.askTimeout", "1") // Fail fast - newConf.set("spark.shuffle.mapOutput.minSizeForBroadcast", "10240") // 10 KiB << 1MiB framesize + newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, 10240L) // 10 KiB << 1MiB framesize // needs TorrentBroadcast so need a SparkContext withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { sc => diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 3203f8f..8b1084a 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -23,7 +23,7 @@ import java.util.concurrent.{Callable, CyclicBarrier, Executors, ExecutorService import org.scalatest.Matchers import org.apache.spark.ShuffleSuite.NonJavaSerializableClass -import org.apache.spark.internal.config.SERIALIZER +import org.apache.spark.internal.config import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD} @@ -42,7 +42,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC conf.set(TEST_NO_STAGE_RETRY, true) test("groupByKey without compression") { - val myConf = conf.clone().set("spark.shuffle.compress", "false") + val myConf = conf.clone().set(config.SHUFFLE_COMPRESS, false) sc = new SparkContext("local", "test", myConf) val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4) val groups = pairs.groupByKey(4).collect() @@ -216,7 +216,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC test("sort with Java non serializable class - Kryo") { // Use a local cluster with 2 processes to make sure there are both local and remote blocks - val myConf = conf.clone().set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") + val myConf = conf.clone().set(config.SERIALIZER, "org.apache.spark.serializer.KryoSerializer") sc = new SparkContext("local-cluster[2,1,1024]", "test", myConf) val a = sc.parallelize(1 to 10, 2) val b = a.map { x => @@ -252,8 +252,8 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC val myConf = conf.clone() .setAppName("test") .setMaster("local") - .set("spark.shuffle.spill.compress", shuffleSpillCompress.toString) - .set("spark.shuffle.compress", shuffleCompress.toString) + .set(config.SHUFFLE_SPILL_COMPRESS, shuffleSpillCompress) + .set(config.SHUFFLE_COMPRESS, shuffleCompress) resetSparkContext() sc = new SparkContext(myConf) val diskBlockManager = sc.env.blockManager.diskBlockManager @@ -263,8 +263,8 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC assert(diskBlockManager.getAllFiles().nonEmpty) } catch { case e: Exception => - val errMsg = s"Failed with spark.shuffle.spill.compress=$shuffleSpillCompress," + - s" spark.shuffle.compress=$shuffleCompress" + val errMsg = s"Failed with ${config.SHUFFLE_SPILL_COMPRESS.key}=$shuffleSpillCompress," + + s" ${config.SHUFFLE_COMPRESS.key}=$shuffleCompress" throw new Exception(errMsg, e) } } diff --git a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala index c0126e4..1aceda4 100644 --- a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala @@ -25,6 +25,7 @@ import org.apache.commons.io.FileUtils import org.apache.commons.io.filefilter.TrueFileFilter import org.scalatest.BeforeAndAfterAll +import org.apache.spark.internal.config import org.apache.spark.rdd.ShuffledRDD import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.shuffle.sort.SortShuffleManager @@ -42,7 +43,7 @@ class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll { // before/after a test, it could return the same directory even if this property // is configured. Utils.clearLocalRootDirs() - conf.set("spark.shuffle.manager", "sort") + conf.set(config.SHUFFLE_MANAGER, "sort") } override def beforeEach(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala index e5d1bf4..abccb8e 100644 --- a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala @@ -75,8 +75,8 @@ class CryptoStreamUtilsSuite extends SparkFunSuite { test("serializer manager integration") { val conf = createConf() - .set("spark.shuffle.compress", "true") - .set("spark.shuffle.spill.compress", "true") + .set(SHUFFLE_COMPRESS, true) + .set(SHUFFLE_SPILL_COMPRESS, true) val plainStr = "hello world" val blockId = new TempShuffleBlockId(UUID.randomUUID()) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 41fb405..16eec7e 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -524,8 +524,8 @@ class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSpar conf.set(SERIALIZER, classOf[KryoSerializer].getName) conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName) conf.set(KRYO_REFERENCE_TRACKING, true) - conf.set("spark.shuffle.manager", "sort") - conf.set("spark.shuffle.sort.bypassMergeThreshold", "200") + conf.set(SHUFFLE_MANAGER, "sort") + conf.set(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD, 200) test("sort-shuffle with bypassMergeSort (SPARK-7873)") { val myObject = ("Hello", "World") diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala index eb97d5a..6d2ef17 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala @@ -23,6 +23,7 @@ import java.nio.ByteBuffer import org.mockito.Mockito.{mock, when} import org.apache.spark._ +import org.apache.spark.internal.config import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.serializer.{JavaSerializer, SerializerManager} import org.apache.spark.storage.{BlockManager, BlockManagerId, ShuffleBlockId} @@ -123,8 +124,8 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext val serializerManager = new SerializerManager( serializer, new SparkConf() - .set("spark.shuffle.compress", "false") - .set("spark.shuffle.spill.compress", "false")) + .set(config.SHUFFLE_COMPRESS, false) + .set(config.SHUFFLE_SPILL_COMPRESS, false)) val taskContext = TaskContext.empty() val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics() diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 233a84e..04de0e4 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -826,7 +826,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("block compression") { try { - conf.set("spark.shuffle.compress", "true") + conf.set(SHUFFLE_COMPRESS, true) var store = makeBlockManager(20000, "exec1") store.putSingle( ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) @@ -834,7 +834,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE "shuffle_0_0_0 was not compressed") stopBlockManager(store) - conf.set("spark.shuffle.compress", "false") + conf.set(SHUFFLE_COMPRESS, false) store = makeBlockManager(20000, "exec2") store.putSingle( ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) @@ -875,7 +875,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed") stopBlockManager(store) } finally { - System.clearProperty("spark.shuffle.compress") + System.clearProperty(SHUFFLE_COMPRESS.key) System.clearProperty(BROADCAST_COMPRESS.key) System.clearProperty(RDD_COMPRESS.key) } diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index de70153..43abb56 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -56,11 +56,11 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite // for a bug we had with bytes written past the last object in a batch (SPARK-2792) conf.set(SERIALIZER_OBJECT_STREAM_RESET, 1) conf.set(SERIALIZER, "org.apache.spark.serializer.JavaSerializer") - conf.set("spark.shuffle.spill.compress", codec.isDefined.toString) - conf.set("spark.shuffle.compress", codec.isDefined.toString) + conf.set(SHUFFLE_SPILL_COMPRESS, codec.isDefined) + conf.set(SHUFFLE_COMPRESS, codec.isDefined) codec.foreach { c => conf.set(IO_COMPRESSION_CODEC, c) } // Ensure that we actually have multiple batches per spill file - conf.set("spark.shuffle.spill.batchSize", "10") + conf.set(SHUFFLE_SPILL_BATCH_SIZE, 10L) conf } @@ -253,7 +253,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite private def testSimpleSpilling(codec: Option[String] = None, encrypt: Boolean = false): Unit = { val size = 1000 val conf = createSparkConf(loadDefaults = true, codec) // Load defaults for Spark home - conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 4).toString) + conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 4) conf.set(IO_ENCRYPTION_ENABLED, encrypt) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) @@ -297,7 +297,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite test("ExternalAppendOnlyMap shouldn't fail when forced to spill before calling its iterator") { val size = 1000 val conf = createSparkConf(loadDefaults = true) - conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) + conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val map = createExternalMap[String] val consumer = createExternalMap[String] @@ -308,7 +308,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite test("spilling with hash collisions") { val size = 1000 val conf = createSparkConf(loadDefaults = true) - conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) + conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val map = createExternalMap[String] @@ -359,7 +359,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite test("spilling with many hash collisions") { val size = 1000 val conf = createSparkConf(loadDefaults = true) - conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) + conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val context = MemoryTestingUtils.fakeTaskContext(sc.env) val map = @@ -388,7 +388,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite test("spilling with hash collisions using the Int.MaxValue key") { val size = 1000 val conf = createSparkConf(loadDefaults = true) - conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) + conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val map = createExternalMap[Int] @@ -407,7 +407,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite test("spilling with null keys and values") { val size = 1000 val conf = createSparkConf(loadDefaults = true) - conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) + conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val map = createExternalMap[Int] @@ -533,7 +533,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite test("SPARK-22713 external aggregation updates peak execution memory") { val spillThreshold = 1000 val conf = createSparkConf(loadDefaults = false) - .set("spark.shuffle.spill.numElementsForceSpillThreshold", spillThreshold.toString) + .set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, spillThreshold) sc = new SparkContext("local", "test", conf) // No spilling AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "external map without spilling") { @@ -553,7 +553,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite val conf = createSparkConf(loadDefaults = false) .set("spark.memory.storageFraction", "0.999") .set(TEST_MEMORY, 471859200L) - .set("spark.shuffle.sort.bypassMergeThreshold", "0") + .set(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD, 0) sc = new SparkContext("local", "test", conf) val N = 200000 sc.parallelize(1 to N, 2) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index 3006409..d6c1562 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -124,7 +124,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("spilling with hash collisions") { val size = 1000 val conf = createSparkConf(loadDefaults = true, kryo = false) - conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) + conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val context = MemoryTestingUtils.fakeTaskContext(sc.env) @@ -185,7 +185,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("spilling with many hash collisions") { val size = 1000 val conf = createSparkConf(loadDefaults = true, kryo = false) - conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) + conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val context = MemoryTestingUtils.fakeTaskContext(sc.env) val agg = new Aggregator[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _) @@ -208,7 +208,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("spilling with hash collisions using the Int.MaxValue key") { val size = 1000 val conf = createSparkConf(loadDefaults = true, kryo = false) - conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) + conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val context = MemoryTestingUtils.fakeTaskContext(sc.env) @@ -234,7 +234,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("spilling with null keys and values") { val size = 1000 val conf = createSparkConf(loadDefaults = true, kryo = false) - conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) + conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val context = MemoryTestingUtils.fakeTaskContext(sc.env) @@ -276,10 +276,10 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { conf.set(SERIALIZER_OBJECT_STREAM_RESET, 1) conf.set(SERIALIZER, classOf[JavaSerializer].getName) } - conf.set("spark.shuffle.sort.bypassMergeThreshold", "0") + conf.set(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD, 0) // Ensure that we actually have multiple batches per spill file - conf.set("spark.shuffle.spill.batchSize", "10") - conf.set("spark.shuffle.spill.initialMemoryThreshold", "512") + conf.set(SHUFFLE_SPILL_BATCH_SIZE, 10L) + conf.set(SHUFFLE_SPILL_INITIAL_MEM_THRESHOLD, 512L) conf } @@ -302,7 +302,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { * =========================================== */ private def emptyDataStream(conf: SparkConf) { - conf.set("spark.shuffle.manager", "sort") + conf.set(SHUFFLE_MANAGER, "sort") sc = new SparkContext("local", "test", conf) val context = MemoryTestingUtils.fakeTaskContext(sc.env) @@ -335,7 +335,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } private def fewElementsPerPartition(conf: SparkConf) { - conf.set("spark.shuffle.manager", "sort") + conf.set(SHUFFLE_MANAGER, "sort") sc = new SparkContext("local", "test", conf) val context = MemoryTestingUtils.fakeTaskContext(sc.env) @@ -377,8 +377,8 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { private def emptyPartitionsWithSpilling(conf: SparkConf) { val size = 1000 - conf.set("spark.shuffle.manager", "sort") - conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) + conf.set(SHUFFLE_MANAGER, "sort") + conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2) sc = new SparkContext("local", "test", conf) val context = MemoryTestingUtils.fakeTaskContext(sc.env) @@ -402,8 +402,8 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { private def testSpillingInLocalCluster(conf: SparkConf, numReduceTasks: Int) { val size = 5000 - conf.set("spark.shuffle.manager", "sort") - conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 4).toString) + conf.set(SHUFFLE_MANAGER, "sort") + conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 4) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) assertSpilled(sc, "reduceByKey") { @@ -462,8 +462,8 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { private def cleanupIntermediateFilesInSorter(withFailures: Boolean): Unit = { val size = 1200 val conf = createSparkConf(loadDefaults = false, kryo = false) - conf.set("spark.shuffle.manager", "sort") - conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 4).toString) + conf.set(SHUFFLE_MANAGER, "sort") + conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 4) sc = new SparkContext("local", "test", conf) val diskBlockManager = sc.env.blockManager.diskBlockManager val ord = implicitly[Ordering[Int]] @@ -491,8 +491,8 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { private def cleanupIntermediateFilesInShuffle(withFailures: Boolean): Unit = { val size = 1200 val conf = createSparkConf(loadDefaults = false, kryo = false) - conf.set("spark.shuffle.manager", "sort") - conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 4).toString) + conf.set(SHUFFLE_MANAGER, "sort") + conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 4) sc = new SparkContext("local", "test", conf) val diskBlockManager = sc.env.blockManager.diskBlockManager val data = sc.parallelize(0 until size, 2).map { i => @@ -527,9 +527,9 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { withSpilling: Boolean) { val size = 1000 if (withSpilling) { - conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) + conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2) } - conf.set("spark.shuffle.manager", "sort") + conf.set(SHUFFLE_MANAGER, "sort") sc = new SparkContext("local", "test", conf) val agg = if (withPartialAgg) { @@ -561,8 +561,8 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { private def sortWithoutBreakingSortingContracts(conf: SparkConf) { val size = 100000 val conf = createSparkConf(loadDefaults = true, kryo = false) - conf.set("spark.shuffle.manager", "sort") - conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString) + conf.set(SHUFFLE_MANAGER, "sort") + conf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, size / 2) sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) // Using wrongOrdering to show integer overflow introduced exception. @@ -619,8 +619,8 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("sorting updates peak execution memory") { val spillThreshold = 1000 val conf = createSparkConf(loadDefaults = false, kryo = false) - .set("spark.shuffle.manager", "sort") - .set("spark.shuffle.spill.numElementsForceSpillThreshold", spillThreshold.toString) + .set(SHUFFLE_MANAGER, "sort") + .set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, spillThreshold) sc = new SparkContext("local", "test", conf) // Avoid aggregating here to make sure we're not also using ExternalAppendOnlyMap // No spilling @@ -641,7 +641,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val conf = createSparkConf(loadDefaults = false, kryo = false) .set("spark.memory.storageFraction", "0.999") .set(TEST_MEMORY, 471859200L) - .set("spark.shuffle.sort.bypassMergeThreshold", "0") + .set(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD, 0) sc = new SparkContext("local", "test", conf) val N = 200000 val p = new org.apache.spark.HashPartitioner(2) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java index 5395e40..d6edddf 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java @@ -40,7 +40,6 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterIterator; public final class UnsafeExternalRowSorter { - static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096; /** * If positive, forces records to be spilled to disk at the given frequency (measured in numbers * of records). This is only intended to be used in tests. @@ -112,8 +111,7 @@ public final class UnsafeExternalRowSorter { taskContext, recordComparatorSupplier, prefixComparator, - sparkEnv.conf().getInt("spark.shuffle.sort.initialBufferSize", - DEFAULT_INITIAL_SORT_BUFFER_SIZE), + (int) sparkEnv.conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()), pageSizeBytes, (int) SparkEnv.get().conf().get( package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD()), diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java index 8da7788..16452b4 100644 --- a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java @@ -29,6 +29,7 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.internal.config.package$; import java.util.Random; @@ -105,8 +106,8 @@ public class RowBasedKeyValueBatchSuite { public void setup() { memoryManager = new TestMemoryManager(new SparkConf() .set("spark.memory.offHeap.enabled", "false") - .set("spark.shuffle.spill.compress", "false") - .set("spark.shuffle.compress", "false")); + .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false) + .set(package$.MODULE$.SHUFFLE_COMPRESS(), false)); taskMemoryManager = new TaskMemoryManager(memoryManager, 0); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index 9eb0343..5b12633 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.spark.SparkEnv; import org.apache.spark.TaskContext; +import org.apache.spark.internal.config.package$; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.serializer.SerializerManager; import org.apache.spark.sql.catalyst.expressions.UnsafeRow; @@ -93,8 +94,7 @@ public final class UnsafeKVExternalSorter { taskContext, comparatorSupplier, prefixComparator, - SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize", - UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE), + (int) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()), pageSizeBytes, numElementsForSpillThreshold, canUseRadixSort); @@ -160,8 +160,7 @@ public final class UnsafeKVExternalSorter { taskContext, comparatorSupplier, prefixComparator, - SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize", - UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE), + (int) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()), pageSizeBytes, numElementsForSpillThreshold, inMemSorter); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index da7b0c6..16398e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -21,6 +21,7 @@ import java.util.Random import java.util.function.Supplier import org.apache.spark._ +import org.apache.spark.internal.config import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{ShuffleWriteMetricsReporter, ShuffleWriteProcessor} @@ -172,7 +173,7 @@ object ShuffleExchangeExec { val conf = SparkEnv.get.conf val shuffleManager = SparkEnv.get.shuffleManager val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager] - val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) + val bypassMergeThreshold = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD) val numParts = partitioner.numPartitions if (sortBasedShuffleOn) { if (numParts <= bypassMergeThreshold) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index 963e425..1640a96 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -21,6 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File} import java.util.Properties import org.apache.spark._ +import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests.TEST_MEMORY import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.rdd.RDD @@ -98,9 +99,10 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession { test("SPARK-10466: external sorter spilling with unsafe row serializer") { val conf = new SparkConf() - .set("spark.shuffle.spill.initialMemoryThreshold", "1") - .set("spark.shuffle.sort.bypassMergeThreshold", "0") + .set(SHUFFLE_SPILL_INITIAL_MEM_THRESHOLD, 1L) + .set(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD, 0) .set(TEST_MEMORY, 80000L) + spark = SparkSession.builder().master("local").appName("test").config(conf).getOrCreate() val outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "") outputFile.deleteOnExit() @@ -127,7 +129,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession { } test("SPARK-10403: unsafe row serializer with SortShuffleManager") { - val conf = new SparkConf().set("spark.shuffle.manager", "sort") + val conf = new SparkConf().set(SHUFFLE_MANAGER, "sort") spark = SparkSession.builder().master("local").appName("test").config(conf).getOrCreate() val row = Row("Hello", 123) val unsafeRow = toUnsafeRow(row, Array(StringType, IntegerType)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org