Repository: spark
Updated Branches:
  refs/heads/master 95e372141 -> 8a1ce4899


[SPARK-13745] [SQL] Support columnar in memory representation on Big Endian 
platforms

## What changes were proposed in this pull request?

parquet datasource and ColumnarBatch tests fail on big-endian platforms This 
patch adds support for the little-endian byte arrays being correctly 
interpreted on a big-endian platform

## How was this patch tested?

Spark test builds ran on big endian z/Linux and regression build on little 
endian amd64

Author: Pete Robbins <[email protected]>

Closes #12397 from robbinspg/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a1ce489
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a1ce489
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a1ce489

Branch: refs/heads/master
Commit: 8a1ce4899fb9f751dedaaa34ea654dfbc8330852
Parents: 95e37214
Author: Pete Robbins <[email protected]>
Authored: Mon May 2 13:16:46 2016 -0700
Committer: Davies Liu <[email protected]>
Committed: Mon May 2 13:16:46 2016 -0700

----------------------------------------------------------------------
 .../parquet/VectorizedPlainValuesReader.java    | 28 +++++++++-
 .../vectorized/OffHeapColumnVector.java         | 54 ++++++++++++++++----
 .../vectorized/OnHeapColumnVector.java          | 41 +++++++++++----
 .../vectorized/ColumnarBatchSuite.scala         |  9 ++++
 4 files changed, 110 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8a1ce489/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 2672e04..9475c85 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -17,6 +17,8 @@
 package org.apache.spark.sql.execution.datasources.parquet;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
 import org.apache.spark.sql.execution.vectorized.ColumnVector;
 import org.apache.spark.unsafe.Platform;
@@ -31,6 +33,9 @@ public class VectorizedPlainValuesReader extends ValuesReader 
implements Vectori
   private byte[] buffer;
   private int offset;
   private int bitOffset; // Only used for booleans.
+  private ByteBuffer byteBuffer; // used to wrap the byte array buffer
+  
+  private final static boolean bigEndianPlatform = 
ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
 
   public VectorizedPlainValuesReader() {
   }
@@ -39,6 +44,9 @@ public class VectorizedPlainValuesReader extends ValuesReader 
implements Vectori
   public void initFromPage(int valueCount, byte[] bytes, int offset) throws 
IOException {
     this.buffer = bytes;
     this.offset = offset + Platform.BYTE_ARRAY_OFFSET;
+    if (bigEndianPlatform) {
+      byteBuffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
+    }
   }
 
   @Override
@@ -103,6 +111,9 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
   @Override
   public final int readInteger() {
     int v = Platform.getInt(buffer, offset);
+    if (bigEndianPlatform) {
+      v = java.lang.Integer.reverseBytes(v);
+    }
     offset += 4;
     return v;
   }
@@ -110,6 +121,9 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
   @Override
   public final long readLong() {
     long v = Platform.getLong(buffer, offset);
+    if (bigEndianPlatform) {
+      v = java.lang.Long.reverseBytes(v);
+    }
     offset += 8;
     return v;
   }
@@ -121,14 +135,24 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
 
   @Override
   public final float readFloat() {
-    float v = Platform.getFloat(buffer, offset);
+    float v;
+    if (!bigEndianPlatform) {
+      v = Platform.getFloat(buffer, offset);
+    } else {
+      v = byteBuffer.getFloat(offset - Platform.BYTE_ARRAY_OFFSET);
+    }
     offset += 4;
     return v;
   }
 
   @Override
   public final double readDouble() {
-    double v = Platform.getDouble(buffer, offset);
+    double v;
+    if (!bigEndianPlatform) {
+      v = Platform.getDouble(buffer, offset);
+    } else {
+      v = byteBuffer.getDouble(offset - Platform.BYTE_ARRAY_OFFSET);
+    }
     offset += 8;
     return v;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8a1ce489/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index b190141..b8dd162 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.execution.vectorized;
 
+import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
 import org.apache.commons.lang.NotImplementedException;
@@ -28,6 +29,9 @@ import org.apache.spark.unsafe.Platform;
  * Column data backed using offheap memory.
  */
 public final class OffHeapColumnVector extends ColumnVector {
+  
+  private final static boolean bigEndianPlatform = 
ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
+  
   // The data stored in these two allocations need to maintain binary 
compatible. We can
   // directly pass this buffer to external components.
   private long nulls;
@@ -39,9 +43,7 @@ public final class OffHeapColumnVector extends ColumnVector {
 
   protected OffHeapColumnVector(int capacity, DataType type) {
     super(capacity, type, MemoryMode.OFF_HEAP);
-    if (!ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN)) {
-      throw new NotImplementedException("Only little endian is supported.");
-    }
+
     nulls = 0;
     data = 0;
     lengthData = 0;
@@ -221,8 +223,16 @@ public final class OffHeapColumnVector extends 
ColumnVector {
 
   @Override
   public void putIntsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
-    Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
-        null, data + 4 * rowId, count * 4);
+    if (!bigEndianPlatform) {
+      Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
+          null, data + 4 * rowId, count * 4);
+    } else {
+      int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
+      long offset = data + 4 * rowId;
+      for (int i = 0; i < count; ++i, offset += 4, srcOffset += 4) {
+        Platform.putInt(null, offset, 
java.lang.Integer.reverseBytes(Platform.getInt(src, srcOffset)));
+      }
+    }
   }
 
   @Override
@@ -259,8 +269,16 @@ public final class OffHeapColumnVector extends 
ColumnVector {
 
   @Override
   public void putLongsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
-    Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
-        null, data + 8 * rowId, count * 8);
+    if (!bigEndianPlatform) {
+      Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
+          null, data + 8 * rowId, count * 8);
+    } else {
+      int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
+      long offset = data + 8 * rowId;
+      for (int i = 0; i < count; ++i, offset += 8, srcOffset += 8) {
+        Platform.putLong(null, offset, 
java.lang.Long.reverseBytes(Platform.getLong(src, srcOffset)));
+      }
+    }
   }
 
   @Override
@@ -297,8 +315,16 @@ public final class OffHeapColumnVector extends 
ColumnVector {
 
   @Override
   public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
-    Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
-        null, data + rowId * 4, count * 4);
+    if (!bigEndianPlatform) {
+      Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
+          null, data + rowId * 4, count * 4);
+    } else {
+      ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
+      long offset = data + 4 * rowId;
+      for (int i = 0; i < count; ++i, offset += 4) {
+        Platform.putFloat(null, offset, bb.getFloat(srcIndex + (4 * i)));
+      }
+    }
   }
 
   @Override
@@ -336,8 +362,16 @@ public final class OffHeapColumnVector extends 
ColumnVector {
 
   @Override
   public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
-    Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
+    if (!bigEndianPlatform) {
+      Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
         null, data + rowId * 8, count * 8);
+    } else {
+      ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
+      long offset = data + 8 * rowId;
+      for (int i = 0; i < count; ++i, offset += 8) {
+        Platform.putDouble(null, offset, bb.getDouble(srcIndex + (8 * i)));
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/8a1ce489/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index e972768..b1ffe4c 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.execution.vectorized;
 
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.Arrays;
 
 import org.apache.spark.memory.MemoryMode;
@@ -27,6 +29,9 @@ import org.apache.spark.unsafe.Platform;
  * and a java array for the values.
  */
 public final class OnHeapColumnVector extends ColumnVector {
+  
+  private final static boolean bigEndianPlatform = 
ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
+  
   // The data stored in these arrays need to maintain binary compatible. We can
   // directly pass this buffer to external components.
 
@@ -211,10 +216,11 @@ public final class OnHeapColumnVector extends 
ColumnVector {
   @Override
   public void putIntsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
     int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
-    for (int i = 0; i < count; ++i) {
+    for (int i = 0; i < count; ++i, srcOffset += 4) {
       intData[i + rowId] = Platform.getInt(src, srcOffset);
-      srcIndex += 4;
-      srcOffset += 4;
+      if (bigEndianPlatform) {
+        intData[i + rowId] = java.lang.Integer.reverseBytes(intData[i + 
rowId]);
+      }
     }
   }
 
@@ -251,10 +257,11 @@ public final class OnHeapColumnVector extends 
ColumnVector {
   @Override
   public void putLongsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
     int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
-    for (int i = 0; i < count; ++i) {
+    for (int i = 0; i < count; ++i, srcOffset += 8) {
       longData[i + rowId] = Platform.getLong(src, srcOffset);
-      srcIndex += 8;
-      srcOffset += 8;
+      if (bigEndianPlatform) {
+        longData[i + rowId] = java.lang.Long.reverseBytes(longData[i + rowId]);
+      }
     }
   }
 
@@ -286,8 +293,15 @@ public final class OnHeapColumnVector extends ColumnVector 
{
 
   @Override
   public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
-    Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
-        floatData, Platform.DOUBLE_ARRAY_OFFSET + rowId * 4, count * 4);
+    if (!bigEndianPlatform) {
+      Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, 
floatData,
+          Platform.DOUBLE_ARRAY_OFFSET + rowId * 4, count * 4);
+    } else {
+      ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
+      for (int i = 0; i < count; ++i) {
+        floatData[i + rowId] = bb.getFloat(srcIndex + (4 * i));
+      }
+    }
   }
 
   @Override
@@ -320,8 +334,15 @@ public final class OnHeapColumnVector extends ColumnVector 
{
 
   @Override
   public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
-    Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, doubleData,
-        Platform.DOUBLE_ARRAY_OFFSET + rowId * 8, count * 8);
+    if (!bigEndianPlatform) {
+      Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, 
doubleData,
+          Platform.DOUBLE_ARRAY_OFFSET + rowId * 8, count * 8);
+    } else {
+      ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
+      for (int i = 0; i < count; ++i) {
+        doubleData[i + rowId] = bb.getDouble(srcIndex + (8 * i));
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/8a1ce489/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index a63007f..7e576a8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql.execution.vectorized
 
 import java.nio.charset.StandardCharsets
+import java.nio.ByteBuffer
+import java.nio.ByteOrder
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -280,6 +282,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
       Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234)
       Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, 1.123)
 
+      if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
+        // Ensure array contains Liitle Endian doubles
+        var bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN)
+        Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getDouble(0))
+        Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, 
bb.getDouble(8))
+      }
+
       column.putDoubles(idx, 1, buffer, 8)
       column.putDoubles(idx + 1, 1, buffer, 0)
       reference += 1.123


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to