This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dd8b05f25fdc [SPARK-42252][CORE] Add
`spark.shuffle.localDisk.file.output.buffer` and deprecate
`spark.shuffle.unsafe.file.output.buffer`
dd8b05f25fdc is described below
commit dd8b05f25fdc2c964e351f4cbbf0dd474474783c
Author: wayneguow <[email protected]>
AuthorDate: Fri Jun 14 15:11:33 2024 +0800
[SPARK-42252][CORE] Add `spark.shuffle.localDisk.file.output.buffer` and
deprecate `spark.shuffle.unsafe.file.output.buffer`
### What changes were proposed in this pull request?
Deprecate spark.shuffle.unsafe.file.output.buffer and add a new config
spark.shuffle.localDisk.file.output.buffer instead.
### Why are the changes needed?
The old config is desgined to be used in UnsafeShuffleWriter, but now it
has been used in all local shuffle writers through
LocalDiskShuffleMapOutputWriter, introduced by #25007.
### Does this PR introduce _any_ user-facing change?
Old still works, advised to use new.
### How was this patch tested?
Passed existing tests.
Closes #39819 from wayneguow/shuffle_output_buffer.
Authored-by: wayneguow <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java | 2 +-
core/src/main/scala/org/apache/spark/SparkConf.scala | 4 +++-
.../scala/org/apache/spark/internal/config/package.scala | 10 ++++++++--
.../sort/io/LocalDiskShuffleMapOutputWriterSuite.scala | 2 +-
docs/configuration.md | 12 ++++++++++--
docs/core-migration-guide.md | 2 ++
6 files changed, 25 insertions(+), 7 deletions(-)
diff --git
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
index 606bb625f5b2..c0b9018c770a 100644
---
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
+++
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
@@ -74,7 +74,7 @@ public class LocalDiskShuffleMapOutputWriter implements
ShuffleMapOutputWriter {
this.blockResolver = blockResolver;
this.bufferSize =
(int) (long) sparkConf.get(
- package$.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE()) * 1024;
+ package$.MODULE$.SHUFFLE_LOCAL_DISK_FILE_OUTPUT_BUFFER_SIZE()) * 1024;
this.partitionLengths = new long[numPartitions];
this.outputFile = blockResolver.getDataFile(shuffleId, mapId);
this.outputTempFile = null;
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 95955455a9d4..cfb514913694 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -647,7 +647,9 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.yarn.blacklist.executor.launch.blacklisting.enabled",
"3.1.0",
"Please use spark.yarn.executor.launch.excludeOnFailure.enabled"),
DeprecatedConfig("spark.network.remoteReadNioBufferConversion", "3.5.2",
- "Please open a JIRA ticket to report it if you need to use this
configuration.")
+ "Please open a JIRA ticket to report it if you need to use this
configuration."),
+ DeprecatedConfig("spark.shuffle.unsafe.file.output.buffer", "4.0.0",
+ "Please use spark.shuffle.localDisk.file.output.buffer")
)
Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index a7268c640991..9fcd9ba529c1 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1463,8 +1463,7 @@ package object config {
private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.unsafe.file.output.buffer")
- .doc("The file system for this buffer size after each partition " +
- "is written in unsafe shuffle writer. In KiB unless otherwise
specified.")
+ .doc("(Deprecated since Spark 4.0, please use
'spark.shuffle.localDisk.file.output.buffer'.)")
.version("2.3.0")
.bytesConf(ByteUnit.KiB)
.checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
/ 1024,
@@ -1472,6 +1471,13 @@ package object config {
s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
.createWithDefaultString("32k")
+ private[spark] val SHUFFLE_LOCAL_DISK_FILE_OUTPUT_BUFFER_SIZE =
+ ConfigBuilder("spark.shuffle.localDisk.file.output.buffer")
+ .doc("The file system for this buffer size after each partition " +
+ "is written in all local disk shuffle writers. In KiB unless otherwise
specified.")
+ .version("4.0.0")
+ .fallbackConf(SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE)
+
private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.spill.diskWriteBufferSize")
.doc("The buffer size, in bytes, to use when writing the sorted records
to an on-disk file.")
diff --git
a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
index 3db752726256..7ab2cb864234 100644
---
a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
+++
b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
@@ -71,7 +71,7 @@ class LocalDiskShuffleMapOutputWriterSuite extends
SparkFunSuite {
partitionSizesInMergedFile = null
conf = new SparkConf()
.set("spark.app.id", "example.spark.app")
- .set("spark.shuffle.unsafe.file.output.buffer", "16k")
+ .set("spark.shuffle.localDisk.file.output.buffer", "16k")
when(blockResolver.getDataFile(anyInt,
anyLong)).thenReturn(mergedOutputFile)
when(blockResolver.createTempFile(any(classOf[File])))
.thenAnswer { invocationOnMock =>
diff --git a/docs/configuration.md b/docs/configuration.md
index 23443cab2eac..6833d4e54fd0 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1033,11 +1033,19 @@ Apart from these, the following properties are also
available, and may be useful
<td><code>spark.shuffle.unsafe.file.output.buffer</code></td>
<td>32k</td>
<td>
- The file system for this buffer size after each partition is written in
unsafe shuffle writer.
- In KiB unless otherwise specified.
+ Deprecated since Spark 4.0, please use
<code>spark.shuffle.localDisk.file.output.buffer</code>.
</td>
<td>2.3.0</td>
</tr>
+<tr>
+ <td><code>spark.shuffle.localDisk.file.output.buffer</code></td>
+ <td>32k</td>
+ <td>
+ The file system for this buffer size after each partition is written in
all local disk shuffle writers.
+ In KiB unless otherwise specified.
+ </td>
+ <td>4.0.0</td>
+</tr>
<tr>
<td><code>spark.shuffle.spill.diskWriteBufferSize</code></td>
<td>1024 * 1024</td>
diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md
index 1c37fded53ab..26b0ff32cf5d 100644
--- a/docs/core-migration-guide.md
+++ b/docs/core-migration-guide.md
@@ -50,6 +50,8 @@ license: |
- Since Spark 4.0, Spark performs speculative executions less aggressively
with `spark.speculation.multiplier=3` and `spark.speculation.quantile=0.9`. To
restore the legacy behavior, you can set `spark.speculation.multiplier=1.5` and
`spark.speculation.quantile=0.75`.
+- Since Spark 4.0, `spark.shuffle.unsafe.file.output.buffer` is deprecated
though still works. Use `spark.shuffle.localDisk.file.output.buffer` instead.
+
## Upgrading from Core 3.4 to 3.5
- Since Spark 3.5, `spark.yarn.executor.failuresValidityInterval` is
deprecated. Use `spark.executor.failuresValidityInterval` instead.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]