Repository: spark Updated Branches: refs/heads/master 463a67668 -> 6c9e5ac9d
[SPARK-25776][CORE]The disk write buffer size must be greater than 12 ## What changes were proposed in this pull request? In `UnsafeSorterSpillWriter.java`, when we write a record to a spill file wtih ` void write(Object baseObject, long baseOffset, int recordLength, long keyPrefix)`, `recordLength` and `keyPrefix` will be written the disk write buffer first, and these will take 12 bytes, so the disk write buffer size must be greater than 12. If `diskWriteBufferSize` is 10, it will print this exception info: _java.lang.ArrayIndexOutOfBoundsException: 10 at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.writeLongToBuffer (UnsafeSorterSpillWriter.java:91) at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.write(UnsafeSorterSpillWriter.java:123) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spillIterator(UnsafeExternalSorter.java:498) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:222) at org.apache.spark.memory.MemoryConsumer.spill(MemoryConsumer.java:65)_ ## How was this patch tested? Existing UT in `UnsafeExternalSorterSuite` Closes #22754 from 10110346/diskWriteBufferSize. Authored-by: liuxian <liu.xi...@zte.com.cn> Signed-off-by: Kazuaki Ishizaki <ishiz...@jp.ibm.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c9e5ac9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c9e5ac9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c9e5ac9 Branch: refs/heads/master Commit: 6c9e5ac9de3d0ae5ea86b768608b42b5feb46df4 Parents: 463a676 Author: liuxian <liu.xi...@zte.com.cn> Authored: Mon Nov 5 01:55:13 2018 +0900 Committer: Kazuaki Ishizaki <ishiz...@jp.ibm.com> Committed: Mon Nov 5 01:55:13 2018 +0900 ---------------------------------------------------------------------- .../util/collection/unsafe/sort/UnsafeSorterSpillWriter.java | 5 ++++- .../main/scala/org/apache/spark/internal/config/package.scala | 6 ++++-- 2 files changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6c9e5ac9/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java index 9399024..c1d71a2 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java @@ -42,7 +42,10 @@ public final class UnsafeSorterSpillWriter { private final SparkConf conf = new SparkConf(); - /** The buffer size to use when writing the sorted records to an on-disk file */ + /** + * The buffer size to use when writing the sorted records to an on-disk file, and + * this space used by prefix + len + recordLength must be greater than 4 + 8 bytes. + */ private final int diskWriteBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE()); http://git-wip-us.apache.org/repos/asf/spark/blob/6c9e5ac9/core/src/main/scala/org/apache/spark/internal/config/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 034e5eb..c8993e1 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.launcher.SparkLauncher import org.apache.spark.network.util.ByteUnit +import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.util.Utils package object config { @@ -504,8 +505,9 @@ package object config { ConfigBuilder("spark.shuffle.spill.diskWriteBufferSize") .doc("The buffer size, in bytes, to use when writing the sorted records to an on-disk file.") .bytesConf(ByteUnit.BYTE) - .checkValue(v => v > 0 && v <= Int.MaxValue, - s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.") + .checkValue(v => v > 12 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, + s"The buffer size must be greater than 12 and less than or equal to " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") .createWithDefault(1024 * 1024) private[spark] val UNROLL_MEMORY_CHECK_PERIOD = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org