This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new aa3d16d [SPARK-26698][CORE] Use ConfigEntry for hardcoded configs for memory and storage categories aa3d16d is described below commit aa3d16d68b7ebd9210c330905f01590ef93d875c Author: SongYadong <song.yado...@zte.com.cn> AuthorDate: Fri Jan 25 22:28:12 2019 -0600 [SPARK-26698][CORE] Use ConfigEntry for hardcoded configs for memory and storage categories ## What changes were proposed in this pull request? This PR makes hardcoded configs about spark memory and storage to use `ConfigEntry` and put them in the config package. ## How was this patch tested? Existing unit tests. Closes #23623 from SongYadong/configEntry_for_mem_storage. Authored-by: SongYadong <song.yado...@zte.com.cn> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../org/apache/spark/deploy/worker/Worker.scala | 2 +- .../org/apache/spark/internal/config/package.scala | 20 ++++++++++++++++++++ .../org/apache/spark/storage/BlockManager.scala | 2 +- .../org/apache/spark/storage/DiskBlockManager.scala | 4 ++-- .../apache/spark/memory/TaskMemoryManagerSuite.java | 21 +++++++++++++-------- .../shuffle/sort/PackedRecordPointerSuite.java | 7 ++++--- .../shuffle/sort/ShuffleInMemorySorterSuite.java | 5 +++-- .../shuffle/sort/UnsafeShuffleWriterSuite.java | 2 +- .../unsafe/map/AbstractBytesToBytesMapSuite.java | 4 ++-- .../unsafe/sort/UnsafeExternalSorterSuite.java | 2 +- .../unsafe/sort/UnsafeInMemorySorterSuite.java | 9 ++++++--- .../apache/spark/deploy/worker/WorkerSuite.scala | 3 ++- .../apache/spark/storage/BlockManagerSuite.scala | 4 ++-- .../collection/ExternalAppendOnlyMapSuite.scala | 2 +- .../spark/util/collection/ExternalSorterSuite.scala | 2 +- .../expressions/RowBasedKeyValueBatchSuite.java | 2 +- .../execution/sort/RecordBinaryComparatorSuite.java | 4 +++- .../spark/streaming/ReceivedBlockHandlerSuite.scala | 2 +- 18 files changed, 65 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 115450b..5f7ca5c 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -102,7 +102,7 @@ private[deploy] class Worker( // Whether or not cleanup the non-shuffle files on executor exits. private val CLEANUP_NON_SHUFFLE_FILES_ENABLED = - conf.getBoolean("spark.storage.cleanupFilesAfterExecutorExit", true) + conf.get(config.STORAGE_CLEANUP_FILES_AFTER_EXECUTOR_EXIT) private var master: Option[RpcEndpointRef] = None diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index b591284..2fb83d3 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -270,6 +270,26 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString(Network.NETWORK_TIMEOUT.defaultValueString) + private[spark] val STORAGE_CLEANUP_FILES_AFTER_EXECUTOR_EXIT = + ConfigBuilder("spark.storage.cleanupFilesAfterExecutorExit") + .doc("Whether or not cleanup the non-shuffle files on executor exits.") + .booleanConf + .createWithDefault(true) + + private[spark] val DISKSTORE_SUB_DIRECTORIES = + ConfigBuilder("spark.diskStore.subDirectories") + .doc("Number of subdirectories inside each path listed in spark.local.dir for " + + "hashing Block files into.") + .intConf + .createWithDefault(64) + + private[spark] val BLOCK_FAILURES_BEFORE_LOCATION_REFRESH = + ConfigBuilder("spark.block.failures.beforeLocationRefresh") + .doc("Max number of failures before this block manager refreshes " + + "the block locations from the driver.") + .intConf + .createWithDefault(5) + private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal() .booleanConf.createWithDefault(false) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index dd05cb3..e02870cd 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -196,7 +196,7 @@ private[spark] class BlockManager( // Max number of failures before this block manager refreshes the block locations from the driver private val maxFailuresBeforeLocationRefresh = - conf.getInt("spark.block.failures.beforeLocationRefresh", 5) + conf.get(config.BLOCK_FAILURES_BEFORE_LOCATION_REFRESH) private val slaveEndpoint = rpcEnv.setupEndpoint( "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next, diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index a69bcc9..95ce4b0 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -22,7 +22,7 @@ import java.util.UUID import org.apache.spark.SparkConf import org.apache.spark.executor.ExecutorExitCode -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.util.{ShutdownHookManager, Utils} /** @@ -34,7 +34,7 @@ import org.apache.spark.util.{ShutdownHookManager, Utils} */ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging { - private[spark] val subDirsPerLocalDir = conf.getInt("spark.diskStore.subDirectories", 64) + private[spark] val subDirsPerLocalDir = conf.get(config.DISKSTORE_SUB_DIRECTORIES) /* Create one local directory for each path mentioned in spark.local.dir; then, inside this * directory, create multiple subdirectories that we will hash files into, in order to avoid diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java index dc1fe77..d195254 100644 --- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java +++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java @@ -23,6 +23,7 @@ import org.junit.Test; import org.apache.spark.SparkConf; import org.apache.spark.unsafe.memory.MemoryAllocator; import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.internal.config.package$; public class TaskMemoryManagerSuite { @@ -30,7 +31,7 @@ public class TaskMemoryManagerSuite { public void leakedPageMemoryIsDetected() { final TaskMemoryManager manager = new TaskMemoryManager( new UnifiedMemoryManager( - new SparkConf().set("spark.memory.offHeap.enabled", "false"), + new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false), Long.MAX_VALUE, Long.MAX_VALUE, 1), @@ -44,8 +45,8 @@ public class TaskMemoryManagerSuite { @Test public void encodePageNumberAndOffsetOffHeap() { final SparkConf conf = new SparkConf() - .set("spark.memory.offHeap.enabled", "true") - .set("spark.memory.offHeap.size", "1000"); + .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true) + .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 1000L); final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0); final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.OFF_HEAP); final MemoryBlock dataPage = manager.allocatePage(256, c); @@ -61,7 +62,8 @@ public class TaskMemoryManagerSuite { @Test public void encodePageNumberAndOffsetOnHeap() { final TaskMemoryManager manager = new TaskMemoryManager( - new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0); + new TestMemoryManager( + new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0); final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP); final MemoryBlock dataPage = manager.allocatePage(256, c); final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, 64); @@ -72,7 +74,8 @@ public class TaskMemoryManagerSuite { @Test public void freeingPageSetsPageNumberToSpecialConstant() { final TaskMemoryManager manager = new TaskMemoryManager( - new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0); + new TestMemoryManager( + new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0); final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP); final MemoryBlock dataPage = manager.allocatePage(256, c); c.freePage(dataPage); @@ -82,7 +85,8 @@ public class TaskMemoryManagerSuite { @Test(expected = AssertionError.class) public void freeingPageDirectlyInAllocatorTriggersAssertionError() { final TaskMemoryManager manager = new TaskMemoryManager( - new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0); + new TestMemoryManager( + new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0); final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP); final MemoryBlock dataPage = manager.allocatePage(256, c); MemoryAllocator.HEAP.free(dataPage); @@ -91,7 +95,8 @@ public class TaskMemoryManagerSuite { @Test(expected = AssertionError.class) public void callingFreePageOnDirectlyAllocatedPageTriggersAssertionError() { final TaskMemoryManager manager = new TaskMemoryManager( - new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0); + new TestMemoryManager( + new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0); final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP); final MemoryBlock dataPage = MemoryAllocator.HEAP.allocate(256); manager.freePage(dataPage, c); @@ -199,7 +204,7 @@ public class TaskMemoryManagerSuite { // was deprecated in Spark 1.6 and replaced by `spark.memory.offHeap.enabled` (see SPARK-12251). final SparkConf conf = new SparkConf() .set("spark.unsafe.offHeap", "true") - .set("spark.memory.offHeap.size", "1000"); + .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 1000L); final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0); Assert.assertSame(MemoryMode.OFF_HEAP, manager.tungstenMemoryMode); } diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java index 354efe1..ccf47a4 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.junit.Test; import org.apache.spark.SparkConf; +import org.apache.spark.internal.config.package$; import org.apache.spark.memory.*; import org.apache.spark.unsafe.memory.MemoryBlock; @@ -34,7 +35,7 @@ public class PackedRecordPointerSuite { @Test public void heap() throws IOException { - final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "false"); + final SparkConf conf = new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false); final TaskMemoryManager memoryManager = new TaskMemoryManager(new TestMemoryManager(conf), 0); final MemoryConsumer c = new TestMemoryConsumer(memoryManager, MemoryMode.ON_HEAP); @@ -55,8 +56,8 @@ public class PackedRecordPointerSuite { @Test public void offHeap() throws IOException { final SparkConf conf = new SparkConf() - .set("spark.memory.offHeap.enabled", "true") - .set("spark.memory.offHeap.size", "10000"); + .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true) + .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 10000L); final TaskMemoryManager memoryManager = new TaskMemoryManager(new TestMemoryManager(conf), 0); final MemoryConsumer c = new TestMemoryConsumer(memoryManager, MemoryMode.OFF_HEAP); diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java index 694352e..7a17d90 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java @@ -26,6 +26,7 @@ import org.junit.Test; import org.apache.spark.HashPartitioner; import org.apache.spark.SparkConf; +import org.apache.spark.internal.config.package$; import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.memory.TestMemoryConsumer; @@ -38,7 +39,7 @@ public class ShuffleInMemorySorterSuite { protected boolean shouldUseRadixSort() { return false; } final TestMemoryManager memoryManager = - new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")); + new TestMemoryManager(new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)); final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0); final TestMemoryConsumer consumer = new TestMemoryConsumer(taskMemoryManager); @@ -69,7 +70,7 @@ public class ShuffleInMemorySorterSuite { "Lychee", "Mango" }; - final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "false"); + final SparkConf conf = new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false); final TaskMemoryManager memoryManager = new TaskMemoryManager(new TestMemoryManager(conf), 0); final MemoryConsumer c = new TestMemoryConsumer(memoryManager); diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index f34ae99..9bf707f 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -102,7 +102,7 @@ public class UnsafeShuffleWriterSuite { spillFilesCreated.clear(); conf = new SparkConf() .set("spark.buffer.pageSize", "1m") - .set("spark.memory.offHeap.enabled", "false"); + .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false); taskMetrics = new TaskMetrics(); memoryManager = new TestMemoryManager(conf); taskMemoryManager = new TaskMemoryManager(memoryManager, 0); diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java index e4e0d47..8d03c67 100644 --- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java +++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java @@ -78,8 +78,8 @@ public abstract class AbstractBytesToBytesMapSuite { memoryManager = new TestMemoryManager( new SparkConf() - .set("spark.memory.offHeap.enabled", "" + useOffHeapMemoryAllocator()) - .set("spark.memory.offHeap.size", "256mb") + .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), useOffHeapMemoryAllocator()) + .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 256 * 1024 * 1024L) .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false) .set(package$.MODULE$.SHUFFLE_COMPRESS(), false)); taskMemoryManager = new TaskMemoryManager(memoryManager, 0); diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index a56743f..dd71d32 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -59,7 +59,7 @@ public class UnsafeExternalSorterSuite { final LinkedList<File> spillFilesCreated = new LinkedList<>(); final TestMemoryManager memoryManager = - new TestMemoryManager(conf.clone().set("spark.memory.offHeap.enabled", "false")); + new TestMemoryManager(conf.clone().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)); final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0); final SerializerManager serializerManager = new SerializerManager( new JavaSerializer(conf), diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java index b0d485f..2b8a060 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java @@ -31,6 +31,7 @@ import org.apache.spark.memory.SparkOutOfMemoryError; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.internal.config.package$; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -52,7 +53,8 @@ public class UnsafeInMemorySorterSuite { @Test public void testSortingEmptyInput() { final TaskMemoryManager memoryManager = new TaskMemoryManager( - new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0); + new TestMemoryManager( + new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0); final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); final UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager, @@ -78,7 +80,8 @@ public class UnsafeInMemorySorterSuite { "Mango" }; final TaskMemoryManager memoryManager = new TaskMemoryManager( - new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0); + new TestMemoryManager( + new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0); final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer); final Object baseObject = dataPage.getBaseObject(); @@ -146,7 +149,7 @@ public class UnsafeInMemorySorterSuite { @Test public void freeAfterOOM() { final SparkConf sparkConf = new SparkConf(); - sparkConf.set("spark.memory.offHeap.enabled", "false"); + sparkConf.set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false); final TestMemoryManager testMemoryManager = new TestMemoryManager(sparkConf); diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala index 0ddf38c..f6559df 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService} import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged} import org.apache.spark.deploy.master.DriverState +import org.apache.spark.internal.config import org.apache.spark.internal.config.Worker._ import org.apache.spark.rpc.{RpcAddress, RpcEnv} @@ -224,7 +225,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { } private def testCleanupFilesWithConfig(value: Boolean) = { - val conf = new SparkConf().set("spark.storage.cleanupFilesAfterExecutorExit", value.toString) + val conf = new SparkConf().set(config.STORAGE_CLEANUP_FILES_AFTER_EXECUTOR_EXIT, value) val cleanupCalled = new AtomicBoolean(false) when(shuffleService.executorRemoved(any[String], any[String])).thenAnswer(new Answer[Unit] { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 04de0e4..9a95567 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1242,7 +1242,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("SPARK-13328: refresh block locations (fetch should fail after hitting a threshold)") { val mockBlockTransferService = - new MockBlockTransferService(conf.getInt("spark.block.failures.beforeLocationRefresh", 5)) + new MockBlockTransferService(conf.get(BLOCK_FAILURES_BEFORE_LOCATION_REFRESH)) val store = makeBlockManager(8000, "executor1", transferService = Option(mockBlockTransferService)) store.putSingle("item", 999L, StorageLevel.MEMORY_ONLY, tellMaster = true) @@ -1251,7 +1251,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("SPARK-13328: refresh block locations (fetch should succeed after location refresh)") { val maxFailuresBeforeLocationRefresh = - conf.getInt("spark.block.failures.beforeLocationRefresh", 5) + conf.get(BLOCK_FAILURES_BEFORE_LOCATION_REFRESH) val mockBlockManagerMaster = mock(classOf[BlockManagerMaster]) val mockBlockTransferService = new MockBlockTransferService(maxFailuresBeforeLocationRefresh) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 43abb56..d5a20cc 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -551,7 +551,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite test("force to spill for external aggregation") { val conf = createSparkConf(loadDefaults = false) - .set("spark.memory.storageFraction", "0.999") + .set(MEMORY_STORAGE_FRACTION, 0.999) .set(TEST_MEMORY, 471859200L) .set(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD, 0) sc = new SparkContext("local", "test", conf) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index d6c1562..bbc0b33 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -639,7 +639,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("force to spill for external sorter") { val conf = createSparkConf(loadDefaults = false, kryo = false) - .set("spark.memory.storageFraction", "0.999") + .set(MEMORY_STORAGE_FRACTION, 0.999) .set(TEST_MEMORY, 471859200L) .set(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD, 0) sc = new SparkContext("local", "test", conf) diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java index 16452b4..f3d82b4 100644 --- a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java @@ -105,7 +105,7 @@ public class RowBasedKeyValueBatchSuite { @Before public void setup() { memoryManager = new TestMemoryManager(new SparkConf() - .set("spark.memory.offHeap.enabled", "false") + .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false) .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false) .set(package$.MODULE$.SHUFFLE_COMPRESS(), false)); taskMemoryManager = new TaskMemoryManager(memoryManager, 0); diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java index 97f3dc5..92dabc7 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java @@ -18,6 +18,7 @@ package test.org.apache.spark.sql.execution.sort; import org.apache.spark.SparkConf; +import org.apache.spark.internal.config.package$; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.memory.TestMemoryConsumer; import org.apache.spark.memory.TestMemoryManager; @@ -41,7 +42,8 @@ import org.junit.Test; public class RecordBinaryComparatorSuite { private final TaskMemoryManager memoryManager = new TaskMemoryManager( - new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0); + new TestMemoryManager( + new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0); private final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); private final int uaoSize = UnsafeAlignedOffset.getUaoSize(); diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index d1a6e8a..f442811 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -214,7 +214,7 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) test("Test Block - isFullyConsumed") { val sparkConf = new SparkConf().set("spark.app.id", "streaming-test") - sparkConf.set("spark.storage.unrollMemoryThreshold", "512") + sparkConf.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) sparkConf.set(IO_ENCRYPTION_ENABLED, enableEncryption) // Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org