This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dd8b05f25fdc [SPARK-42252][CORE] Add 
`spark.shuffle.localDisk.file.output.buffer` and deprecate 
`spark.shuffle.unsafe.file.output.buffer`
dd8b05f25fdc is described below

commit dd8b05f25fdc2c964e351f4cbbf0dd474474783c
Author: wayneguow <[email protected]>
AuthorDate: Fri Jun 14 15:11:33 2024 +0800

    [SPARK-42252][CORE] Add `spark.shuffle.localDisk.file.output.buffer` and 
deprecate `spark.shuffle.unsafe.file.output.buffer`
    
    ### What changes were proposed in this pull request?
    Deprecate spark.shuffle.unsafe.file.output.buffer and add a new config 
spark.shuffle.localDisk.file.output.buffer instead.
    
    ### Why are the changes needed?
    The old config is desgined to be used in UnsafeShuffleWriter, but now it 
has been used in all local shuffle writers through 
LocalDiskShuffleMapOutputWriter, introduced by #25007.
    
    ### Does this PR introduce _any_ user-facing change?
    Old still works, advised to use new.
    
    ### How was this patch tested?
    Passed existing tests.
    
    Closes #39819 from wayneguow/shuffle_output_buffer.
    
    Authored-by: wayneguow <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 .../shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java     |  2 +-
 core/src/main/scala/org/apache/spark/SparkConf.scala         |  4 +++-
 .../scala/org/apache/spark/internal/config/package.scala     | 10 ++++++++--
 .../sort/io/LocalDiskShuffleMapOutputWriterSuite.scala       |  2 +-
 docs/configuration.md                                        | 12 ++++++++++--
 docs/core-migration-guide.md                                 |  2 ++
 6 files changed, 25 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
index 606bb625f5b2..c0b9018c770a 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
@@ -74,7 +74,7 @@ public class LocalDiskShuffleMapOutputWriter implements 
ShuffleMapOutputWriter {
     this.blockResolver = blockResolver;
     this.bufferSize =
       (int) (long) sparkConf.get(
-        package$.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE()) * 1024;
+        package$.MODULE$.SHUFFLE_LOCAL_DISK_FILE_OUTPUT_BUFFER_SIZE()) * 1024;
     this.partitionLengths = new long[numPartitions];
     this.outputFile = blockResolver.getDataFile(shuffleId, mapId);
     this.outputTempFile = null;
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 95955455a9d4..cfb514913694 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -647,7 +647,9 @@ private[spark] object SparkConf extends Logging {
       
DeprecatedConfig("spark.yarn.blacklist.executor.launch.blacklisting.enabled", 
"3.1.0",
         "Please use spark.yarn.executor.launch.excludeOnFailure.enabled"),
       DeprecatedConfig("spark.network.remoteReadNioBufferConversion", "3.5.2",
-        "Please open a JIRA ticket to report it if you need to use this 
configuration.")
+        "Please open a JIRA ticket to report it if you need to use this 
configuration."),
+      DeprecatedConfig("spark.shuffle.unsafe.file.output.buffer", "4.0.0",
+        "Please use spark.shuffle.localDisk.file.output.buffer")
     )
 
     Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index a7268c640991..9fcd9ba529c1 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1463,8 +1463,7 @@ package object config {
 
   private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
     ConfigBuilder("spark.shuffle.unsafe.file.output.buffer")
-      .doc("The file system for this buffer size after each partition " +
-        "is written in unsafe shuffle writer. In KiB unless otherwise 
specified.")
+      .doc("(Deprecated since Spark 4.0, please use 
'spark.shuffle.localDisk.file.output.buffer'.)")
       .version("2.3.0")
       .bytesConf(ByteUnit.KiB)
       .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH 
/ 1024,
@@ -1472,6 +1471,13 @@ package object config {
           s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
       .createWithDefaultString("32k")
 
+  private[spark] val SHUFFLE_LOCAL_DISK_FILE_OUTPUT_BUFFER_SIZE =
+    ConfigBuilder("spark.shuffle.localDisk.file.output.buffer")
+      .doc("The file system for this buffer size after each partition " +
+        "is written in all local disk shuffle writers. In KiB unless otherwise 
specified.")
+      .version("4.0.0")
+      .fallbackConf(SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE)
+
   private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE =
     ConfigBuilder("spark.shuffle.spill.diskWriteBufferSize")
       .doc("The buffer size, in bytes, to use when writing the sorted records 
to an on-disk file.")
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
index 3db752726256..7ab2cb864234 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
@@ -71,7 +71,7 @@ class LocalDiskShuffleMapOutputWriterSuite extends 
SparkFunSuite {
     partitionSizesInMergedFile = null
     conf = new SparkConf()
       .set("spark.app.id", "example.spark.app")
-      .set("spark.shuffle.unsafe.file.output.buffer", "16k")
+      .set("spark.shuffle.localDisk.file.output.buffer", "16k")
     when(blockResolver.getDataFile(anyInt, 
anyLong)).thenReturn(mergedOutputFile)
     when(blockResolver.createTempFile(any(classOf[File])))
       .thenAnswer { invocationOnMock =>
diff --git a/docs/configuration.md b/docs/configuration.md
index 23443cab2eac..6833d4e54fd0 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1033,11 +1033,19 @@ Apart from these, the following properties are also 
available, and may be useful
   <td><code>spark.shuffle.unsafe.file.output.buffer</code></td>
   <td>32k</td>
   <td>
-    The file system for this buffer size after each partition is written in 
unsafe shuffle writer.
-    In KiB unless otherwise specified.
+    Deprecated since Spark 4.0, please use 
<code>spark.shuffle.localDisk.file.output.buffer</code>.
   </td>
   <td>2.3.0</td>
 </tr>
+<tr>
+  <td><code>spark.shuffle.localDisk.file.output.buffer</code></td>
+  <td>32k</td>
+  <td>
+    The file system for this buffer size after each partition is written in 
all local disk shuffle writers.
+    In KiB unless otherwise specified.
+  </td>
+  <td>4.0.0</td>
+</tr>
 <tr>
   <td><code>spark.shuffle.spill.diskWriteBufferSize</code></td>
   <td>1024 * 1024</td>
diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md
index 1c37fded53ab..26b0ff32cf5d 100644
--- a/docs/core-migration-guide.md
+++ b/docs/core-migration-guide.md
@@ -50,6 +50,8 @@ license: |
 
 - Since Spark 4.0, Spark performs speculative executions less aggressively 
with `spark.speculation.multiplier=3` and `spark.speculation.quantile=0.9`. To 
restore the legacy behavior, you can set `spark.speculation.multiplier=1.5` and 
`spark.speculation.quantile=0.75`.
 
+- Since Spark 4.0, `spark.shuffle.unsafe.file.output.buffer` is deprecated 
though still works. Use `spark.shuffle.localDisk.file.output.buffer` instead.
+
 ## Upgrading from Core 3.4 to 3.5
 
 - Since Spark 3.5, `spark.yarn.executor.failuresValidityInterval` is 
deprecated. Use `spark.executor.failuresValidityInterval` instead.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to