Repository: spark
Updated Branches:
  refs/heads/master 463a67668 -> 6c9e5ac9d


[SPARK-25776][CORE]The disk write buffer size must be greater than 12

## What changes were proposed in this pull request?

 In `UnsafeSorterSpillWriter.java`, when we write a record to a spill file wtih 
` void write(Object baseObject, long baseOffset,  int recordLength, long 
keyPrefix)`, `recordLength` and `keyPrefix`  will be  written  the disk write 
buffer  first, and these will take 12 bytes, so the disk write buffer size must 
be greater than 12.

 If `diskWriteBufferSize` is  10, it will print this exception info:

_java.lang.ArrayIndexOutOfBoundsException: 10
   at 
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.writeLongToBuffer
 (UnsafeSorterSpillWriter.java:91)
        at 
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.write(UnsafeSorterSpillWriter.java:123)
        at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spillIterator(UnsafeExternalSorter.java:498)
        at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:222)
        at org.apache.spark.memory.MemoryConsumer.spill(MemoryConsumer.java:65)_

## How was this patch tested?
Existing UT in `UnsafeExternalSorterSuite`

Closes #22754 from 10110346/diskWriteBufferSize.

Authored-by: liuxian <liu.xi...@zte.com.cn>
Signed-off-by: Kazuaki Ishizaki <ishiz...@jp.ibm.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c9e5ac9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c9e5ac9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c9e5ac9

Branch: refs/heads/master
Commit: 6c9e5ac9de3d0ae5ea86b768608b42b5feb46df4
Parents: 463a676
Author: liuxian <liu.xi...@zte.com.cn>
Authored: Mon Nov 5 01:55:13 2018 +0900
Committer: Kazuaki Ishizaki <ishiz...@jp.ibm.com>
Committed: Mon Nov 5 01:55:13 2018 +0900

----------------------------------------------------------------------
 .../util/collection/unsafe/sort/UnsafeSorterSpillWriter.java   | 5 ++++-
 .../main/scala/org/apache/spark/internal/config/package.scala  | 6 ++++--
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6c9e5ac9/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
index 9399024..c1d71a2 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
@@ -42,7 +42,10 @@ public final class UnsafeSorterSpillWriter {
 
   private final SparkConf conf = new SparkConf();
 
-  /** The buffer size to use when writing the sorted records to an on-disk 
file */
+  /**
+   * The buffer size to use when writing the sorted records to an on-disk 
file, and
+   * this space used by prefix + len + recordLength must be greater than 4 + 8 
bytes.
+   */
   private final int diskWriteBufferSize =
     (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9e5ac9/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 034e5eb..c8993e1 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
 
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.util.Utils
 
 package object config {
@@ -504,8 +505,9 @@ package object config {
     ConfigBuilder("spark.shuffle.spill.diskWriteBufferSize")
       .doc("The buffer size, in bytes, to use when writing the sorted records 
to an on-disk file.")
       .bytesConf(ByteUnit.BYTE)
-      .checkValue(v => v > 0 && v <= Int.MaxValue,
-        s"The buffer size must be greater than 0 and less than 
${Int.MaxValue}.")
+      .checkValue(v => v > 12 && v <= 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
+        s"The buffer size must be greater than 12 and less than or equal to " +
+          s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
       .createWithDefault(1024 * 1024)
 
   private[spark] val UNROLL_MEMORY_CHECK_PERIOD =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to