Repository: spark
Updated Branches:
  refs/heads/master be2238fb5 -> 02d0a1ffd


[SPARK-25069][CORE] Using UnsafeAlignedOffset to make the entire record of 8 
byte Items aligned like which is used in UnsafeExternalSorter

## What changes were proposed in this pull request?

The class of UnsafeExternalSorter used UnsafeAlignedOffset to make the entire 
record of 8 byte Items aligned, but ShuffleExternalSorter not.
The SPARC platform requires this because using a 4 byte Int for record lengths 
causes the entire record of 8 byte Items to become misaligned by 4 bytes. Using 
a 8 byte long for record length keeps things 8 byte aligned.

## How was this patch tested?
Existing Test.

Closes #22053 from eatoncys/UnsafeAlignedOffset.

Authored-by: 10129659 <chen.yans...@zte.com.cn>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02d0a1ff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02d0a1ff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02d0a1ff

Branch: refs/heads/master
Commit: 02d0a1ffd9adb4bf898905095318eb099ed1807f
Parents: be2238f
Author: 10129659 <chen.yans...@zte.com.cn>
Authored: Mon Aug 13 09:09:25 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon Aug 13 09:09:25 2018 +0800

----------------------------------------------------------------------
 .../spark/shuffle/sort/ShuffleExternalSorter.java    | 15 +++++++++------
 .../collection/unsafe/sort/UnsafeExternalSorter.java |  2 +-
 2 files changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/02d0a1ff/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index c3a07b2..c7d2db4 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -43,6 +43,7 @@ import org.apache.spark.storage.DiskBlockObjectWriter;
 import org.apache.spark.storage.FileSegment;
 import org.apache.spark.storage.TempShuffleBlockId;
 import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.UnsafeAlignedOffset;
 import org.apache.spark.unsafe.array.LongArray;
 import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.util.Utils;
@@ -184,6 +185,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
       blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, 
writeMetricsToUse);
 
     int currentPartition = -1;
+    final int uaoSize = UnsafeAlignedOffset.getUaoSize();
     while (sortedRecords.hasNext()) {
       sortedRecords.loadNext();
       final int partition = sortedRecords.packedRecordPointer.getPartitionId();
@@ -200,8 +202,8 @@ final class ShuffleExternalSorter extends MemoryConsumer {
       final long recordPointer = 
sortedRecords.packedRecordPointer.getRecordPointer();
       final Object recordPage = taskMemoryManager.getPage(recordPointer);
       final long recordOffsetInPage = 
taskMemoryManager.getOffsetInPage(recordPointer);
-      int dataRemaining = Platform.getInt(recordPage, recordOffsetInPage);
-      long recordReadPosition = recordOffsetInPage + 4; // skip over record 
length
+      int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, 
recordOffsetInPage);
+      long recordReadPosition = recordOffsetInPage + uaoSize; // skip over 
record length
       while (dataRemaining > 0) {
         final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
         Platform.copyMemory(
@@ -389,15 +391,16 @@ final class ShuffleExternalSorter extends MemoryConsumer {
     }
 
     growPointerArrayIfNecessary();
-    // Need 4 bytes to store the record length.
-    final int required = length + 4;
+    final int uaoSize = UnsafeAlignedOffset.getUaoSize();
+    // Need 4 or 8 bytes to store the record length.
+    final int required = length + uaoSize;
     acquireNewPageIfNecessary(required);
 
     assert(currentPage != null);
     final Object base = currentPage.getBaseObject();
     final long recordAddress = 
taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
-    Platform.putInt(base, pageCursor, length);
-    pageCursor += 4;
+    UnsafeAlignedOffset.putSize(base, pageCursor, length);
+    pageCursor += uaoSize;
     Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
     pageCursor += length;
     inMemSorter.insertRecord(recordAddress, partitionId);

http://git-wip-us.apache.org/repos/asf/spark/blob/02d0a1ff/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 4fc19b1..399251b 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -402,7 +402,7 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
 
     growPointerArrayIfNecessary();
     int uaoSize = UnsafeAlignedOffset.getUaoSize();
-    // Need 4 bytes to store the record length.
+    // Need 4 or 8 bytes to store the record length.
     final int required = length + uaoSize;
     acquireNewPageIfNecessary(required);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to