This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 47991b074a5 [SPARK-38798][CORE] Make `spark.file.transferTo` as an 
`ConfigEntry`
47991b074a5 is described below

commit 47991b074a5a277e1fb75be3a5cc207f400b0b0c
Author: yangjie01 <[email protected]>
AuthorDate: Fri Apr 8 11:26:15 2022 +0900

    [SPARK-38798][CORE] Make `spark.file.transferTo` as an `ConfigEntry`
    
    ### What changes were proposed in this pull request?
    This pr make `spark.file.transferTo` as an `ConfigEntry` and move it into 
`org.apache.spark.internal.config`.
    
    ### Why are the changes needed?
    Use `ConfigEntry` instead of literal.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Pass GA
    
    Closes #36079 from LuciferYang/transferTo-config.
    
    Authored-by: yangjie01 <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java   | 2 +-
 .../java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java   | 2 +-
 .../src/main/scala/org/apache/spark/internal/config/package.scala | 8 ++++++++
 .../org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java   | 3 ++-
 .../spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala    | 3 ++-
 5 files changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index da7a51854cc..d067c870acc 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -118,7 +118,7 @@ final class BypassMergeSortShuffleWriter<K, V>
       ShuffleExecutorComponents shuffleExecutorComponents) throws 
SparkException {
     // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no 
units are provided
     this.fileBufferSize = (int) (long) 
conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
-    this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
+    this.transferToEnabled = (boolean) 
conf.get(package$.MODULE$.SHUFFLE_MERGE_PREFER_NIO());
     this.blockManager = blockManager;
     final ShuffleDependency<K, V, V> dep = handle.dependency();
     this.mapId = mapId;
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index b1779a135b7..9c541841059 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -135,7 +135,7 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
     this.shuffleExecutorComponents = shuffleExecutorComponents;
     this.taskContext = taskContext;
     this.sparkConf = sparkConf;
-    this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", 
true);
+    this.transferToEnabled = (boolean) 
sparkConf.get(package$.MODULE$.SHUFFLE_MERGE_PREFER_NIO());
     this.initialSortBufferSize =
       (int) (long) 
sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE());
     this.inputBufferSizeInBytes =
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index aa8f63e14ef..b67250b7b84 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1494,6 +1494,14 @@ package object config {
       .longConf
       .createWithDefault(10000)
 
+  private[spark] val SHUFFLE_MERGE_PREFER_NIO =
+    ConfigBuilder("spark.file.transferTo")
+      .doc("If true, NIO's `transferTo` API will be preferentially used when 
merging " +
+        "Spark shuffle spill files")
+      .version("1.4.0")
+      .booleanConf
+      .createWithDefault(true)
+
   private[spark] val SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD =
     ConfigBuilder("spark.shuffle.sort.bypassMergeThreshold")
       .doc("In the sort-based shuffle manager, avoid merge-sorting data if 
there is no " +
diff --git 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 8a3df5a9d09..a865e79077a 100644
--- 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -181,7 +181,8 @@ public class UnsafeShuffleWriterSuite implements 
ShuffleChecksumTestHelper {
   private UnsafeShuffleWriter<Object, Object> createWriter(
     boolean transferToEnabled,
     IndexShuffleBlockResolver blockResolver) throws SparkException {
-    conf.set("spark.file.transferTo", String.valueOf(transferToEnabled));
+    conf.set(package$.MODULE$.SHUFFLE_MERGE_PREFER_NIO().key(),
+      String.valueOf(transferToEnabled));
     return new UnsafeShuffleWriter<>(
       blockManager,
       taskMemoryManager,
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index 83bd3b0a997..ce2aefa7422 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -169,7 +169,8 @@ class BypassMergeSortShuffleWriterSuite
 
   Seq(true, false).foreach { transferTo =>
     test(s"write with some empty partitions - transferTo $transferTo") {
-      val transferConf = conf.clone.set("spark.file.transferTo", 
transferTo.toString)
+      val transferConf =
+        conf.clone.set(config.SHUFFLE_MERGE_PREFER_NIO.key, 
transferTo.toString)
       def records: Iterator[(Int, Int)] =
         Iterator((1, 1), (5, 5)) ++ (0 until 100000).iterator.map(x => (2, 2))
       val writer = new BypassMergeSortShuffleWriter[Int, Int](


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to