Repository: spark Updated Branches: refs/heads/master be2238fb5 -> 02d0a1ffd
[SPARK-25069][CORE] Using UnsafeAlignedOffset to make the entire record of 8 byte Items aligned like which is used in UnsafeExternalSorter ## What changes were proposed in this pull request? The class of UnsafeExternalSorter used UnsafeAlignedOffset to make the entire record of 8 byte Items aligned, but ShuffleExternalSorter not. The SPARC platform requires this because using a 4 byte Int for record lengths causes the entire record of 8 byte Items to become misaligned by 4 bytes. Using a 8 byte long for record length keeps things 8 byte aligned. ## How was this patch tested? Existing Test. Closes #22053 from eatoncys/UnsafeAlignedOffset. Authored-by: 10129659 <chen.yans...@zte.com.cn> Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02d0a1ff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02d0a1ff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02d0a1ff Branch: refs/heads/master Commit: 02d0a1ffd9adb4bf898905095318eb099ed1807f Parents: be2238f Author: 10129659 <chen.yans...@zte.com.cn> Authored: Mon Aug 13 09:09:25 2018 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Mon Aug 13 09:09:25 2018 +0800 ---------------------------------------------------------------------- .../spark/shuffle/sort/ShuffleExternalSorter.java | 15 +++++++++------ .../collection/unsafe/sort/UnsafeExternalSorter.java | 2 +- 2 files changed, 10 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/02d0a1ff/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index c3a07b2..c7d2db4 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -43,6 +43,7 @@ import org.apache.spark.storage.DiskBlockObjectWriter; import org.apache.spark.storage.FileSegment; import org.apache.spark.storage.TempShuffleBlockId; import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.UnsafeAlignedOffset; import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.Utils; @@ -184,6 +185,7 @@ final class ShuffleExternalSorter extends MemoryConsumer { blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse); int currentPartition = -1; + final int uaoSize = UnsafeAlignedOffset.getUaoSize(); while (sortedRecords.hasNext()) { sortedRecords.loadNext(); final int partition = sortedRecords.packedRecordPointer.getPartitionId(); @@ -200,8 +202,8 @@ final class ShuffleExternalSorter extends MemoryConsumer { final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer(); final Object recordPage = taskMemoryManager.getPage(recordPointer); final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer); - int dataRemaining = Platform.getInt(recordPage, recordOffsetInPage); - long recordReadPosition = recordOffsetInPage + 4; // skip over record length + int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage); + long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); Platform.copyMemory( @@ -389,15 +391,16 @@ final class ShuffleExternalSorter extends MemoryConsumer { } growPointerArrayIfNecessary(); - // Need 4 bytes to store the record length. - final int required = length + 4; + final int uaoSize = UnsafeAlignedOffset.getUaoSize(); + // Need 4 or 8 bytes to store the record length. + final int required = length + uaoSize; acquireNewPageIfNecessary(required); assert(currentPage != null); final Object base = currentPage.getBaseObject(); final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor); - Platform.putInt(base, pageCursor, length); - pageCursor += 4; + UnsafeAlignedOffset.putSize(base, pageCursor, length); + pageCursor += uaoSize; Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length); pageCursor += length; inMemSorter.insertRecord(recordAddress, partitionId); http://git-wip-us.apache.org/repos/asf/spark/blob/02d0a1ff/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 4fc19b1..399251b 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -402,7 +402,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { growPointerArrayIfNecessary(); int uaoSize = UnsafeAlignedOffset.getUaoSize(); - // Need 4 bytes to store the record length. + // Need 4 or 8 bytes to store the record length. final int required = length + uaoSize; acquireNewPageIfNecessary(required); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org