This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d84f1a3575c4 [SPARK-49217][CORE] Support separate buffer size 
configuration in UnsafeShuffleWriter
d84f1a3575c4 is described below

commit d84f1a3575c4125009374521d2f179089ebd71ad
Author: sychen <[email protected]>
AuthorDate: Fri Aug 23 02:35:30 2024 -0500

    [SPARK-49217][CORE] Support separate buffer size configuration in 
UnsafeShuffleWriter
    
    ### What changes were proposed in this pull request?
    This PR aims to support separate buffer size configuration in 
UnsafeShuffleWriter.
    
    Introduce `spark.shuffle.file.merge.buffer` configuration.
    
    ### Why are the changes needed?
    
    `UnsafeShuffleWriter#mergeSpillsWithFileStream` uses 
`spark.shuffle.file.buffer` as the buffer for reading spill files, and this 
buffer is an off-heap buffer.
    
    In the spill process, we hope that the buffer size is larger, but once 
there are too many files in the spill, 
`UnsafeShuffleWriter#mergeSpillsWithFileStream` needs to create a lot of 
off-heap memory, which makes the executor easily killed by YARN.
    
    
https://github.com/apache/spark/blob/e72d21c299a450e48b3cf6e5d36b8f3e9a568088/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java#L372-L375
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Production environment verification
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #47733 from cxzl25/SPARK-49217.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java     |  8 ++++----
 .../main/scala/org/apache/spark/internal/config/package.scala  |  8 ++++++++
 docs/configuration.md                                          | 10 ++++++++++
 3 files changed, 22 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 13fd18c0942b..ac9d335d6359 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -87,7 +87,7 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
   private final SparkConf sparkConf;
   private final boolean transferToEnabled;
   private final int initialSortBufferSize;
-  private final int inputBufferSizeInBytes;
+  private final int mergeBufferSizeInBytes;
 
   @Nullable private MapStatus mapStatus;
   @Nullable private ShuffleExternalSorter sorter;
@@ -140,8 +140,8 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
     this.transferToEnabled = (boolean) 
sparkConf.get(package$.MODULE$.SHUFFLE_MERGE_PREFER_NIO());
     this.initialSortBufferSize =
       (int) (long) 
sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE());
-    this.inputBufferSizeInBytes =
-      (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) 
* 1024;
+    this.mergeBufferSizeInBytes =
+      (int) (long) 
sparkConf.get(package$.MODULE$.SHUFFLE_FILE_MERGE_BUFFER_SIZE()) * 1024;
     open();
   }
 
@@ -372,7 +372,7 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
       for (int i = 0; i < spills.length; i++) {
         spillInputStreams[i] = new NioBufferedFileInputStream(
           spills[i].file,
-          inputBufferSizeInBytes);
+          mergeBufferSizeInBytes);
         // Only convert the partitionLengths when debug level is enabled.
         if (logger.isDebugEnabled()) {
           logger.debug("Partition lengths for mapId {} in Spill {}: {}", 
mapId, i,
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 940d72df5df6..e9e411cc56b5 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1466,6 +1466,14 @@ package object config {
           s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
       .createWithDefaultString("32k")
 
+  private[spark] val SHUFFLE_FILE_MERGE_BUFFER_SIZE =
+    ConfigBuilder("spark.shuffle.file.merge.buffer")
+      .doc("Size of the in-memory buffer for each shuffle file input stream, 
in KiB unless " +
+        "otherwise specified. These buffers use off-heap buffers and are 
related to the number " +
+        "of files in the shuffle file. Too large buffers should be avoided.")
+      .version("4.0.0")
+      .fallbackConf(SHUFFLE_FILE_BUFFER_SIZE)
+
   private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
     ConfigBuilder("spark.shuffle.unsafe.file.output.buffer")
       .doc("(Deprecated since Spark 4.0, please use 
'spark.shuffle.localDisk.file.output.buffer'.)")
diff --git a/docs/configuration.md b/docs/configuration.md
index 2881660eded6..532da87f5626 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1029,6 +1029,16 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
   <td>1.4.0</td>
 </tr>
+<tr>
+  <td><code>spark.shuffle.file.merge.buffer</code></td>
+  <td>32k</td>
+  <td>
+    Size of the in-memory buffer for each shuffle file input stream, in KiB 
unless otherwise
+    specified. These buffers use off-heap buffers and are related to the 
number of files in
+    the shuffle file. Too large buffers should be avoided.
+  </td>
+  <td>4.0.0</td>
+</tr>
 <tr>
   <td><code>spark.shuffle.unsafe.file.output.buffer</code></td>
   <td>32k</td>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to