This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 47991b074a5 [SPARK-38798][CORE] Make `spark.file.transferTo` as an
`ConfigEntry`
47991b074a5 is described below
commit 47991b074a5a277e1fb75be3a5cc207f400b0b0c
Author: yangjie01 <[email protected]>
AuthorDate: Fri Apr 8 11:26:15 2022 +0900
[SPARK-38798][CORE] Make `spark.file.transferTo` as an `ConfigEntry`
### What changes were proposed in this pull request?
This pr make `spark.file.transferTo` as an `ConfigEntry` and move it into
`org.apache.spark.internal.config`.
### Why are the changes needed?
Use `ConfigEntry` instead of literal.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA
Closes #36079 from LuciferYang/transferTo-config.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java | 2 +-
.../java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 2 +-
.../src/main/scala/org/apache/spark/internal/config/package.scala | 8 ++++++++
.../org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java | 3 ++-
.../spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala | 3 ++-
5 files changed, 14 insertions(+), 4 deletions(-)
diff --git
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index da7a51854cc..d067c870acc 100644
---
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -118,7 +118,7 @@ final class BypassMergeSortShuffleWriter<K, V>
ShuffleExecutorComponents shuffleExecutorComponents) throws
SparkException {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no
units are provided
this.fileBufferSize = (int) (long)
conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
- this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
+ this.transferToEnabled = (boolean)
conf.get(package$.MODULE$.SHUFFLE_MERGE_PREFER_NIO());
this.blockManager = blockManager;
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.mapId = mapId;
diff --git
a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index b1779a135b7..9c541841059 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -135,7 +135,7 @@ public class UnsafeShuffleWriter<K, V> extends
ShuffleWriter<K, V> {
this.shuffleExecutorComponents = shuffleExecutorComponents;
this.taskContext = taskContext;
this.sparkConf = sparkConf;
- this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo",
true);
+ this.transferToEnabled = (boolean)
sparkConf.get(package$.MODULE$.SHUFFLE_MERGE_PREFER_NIO());
this.initialSortBufferSize =
(int) (long)
sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE());
this.inputBufferSizeInBytes =
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index aa8f63e14ef..b67250b7b84 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1494,6 +1494,14 @@ package object config {
.longConf
.createWithDefault(10000)
+ private[spark] val SHUFFLE_MERGE_PREFER_NIO =
+ ConfigBuilder("spark.file.transferTo")
+ .doc("If true, NIO's `transferTo` API will be preferentially used when
merging " +
+ "Spark shuffle spill files")
+ .version("1.4.0")
+ .booleanConf
+ .createWithDefault(true)
+
private[spark] val SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD =
ConfigBuilder("spark.shuffle.sort.bypassMergeThreshold")
.doc("In the sort-based shuffle manager, avoid merge-sorting data if
there is no " +
diff --git
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 8a3df5a9d09..a865e79077a 100644
---
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -181,7 +181,8 @@ public class UnsafeShuffleWriterSuite implements
ShuffleChecksumTestHelper {
private UnsafeShuffleWriter<Object, Object> createWriter(
boolean transferToEnabled,
IndexShuffleBlockResolver blockResolver) throws SparkException {
- conf.set("spark.file.transferTo", String.valueOf(transferToEnabled));
+ conf.set(package$.MODULE$.SHUFFLE_MERGE_PREFER_NIO().key(),
+ String.valueOf(transferToEnabled));
return new UnsafeShuffleWriter<>(
blockManager,
taskMemoryManager,
diff --git
a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index 83bd3b0a997..ce2aefa7422 100644
---
a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++
b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -169,7 +169,8 @@ class BypassMergeSortShuffleWriterSuite
Seq(true, false).foreach { transferTo =>
test(s"write with some empty partitions - transferTo $transferTo") {
- val transferConf = conf.clone.set("spark.file.transferTo",
transferTo.toString)
+ val transferConf =
+ conf.clone.set(config.SHUFFLE_MERGE_PREFER_NIO.key,
transferTo.toString)
def records: Iterator[(Int, Int)] =
Iterator((1, 1), (5, 5)) ++ (0 until 100000).iterator.map(x => (2, 2))
val writer = new BypassMergeSortShuffleWriter[Int, Int](
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]