This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3e4cfe9 [SPARK-27406][SQL] UnsafeArrayData serialization breaks when
two machines have different Oops size
3e4cfe9 is described below
commit 3e4cfe9dbcabc6c3d91b4ad0f0c5834bf5d740d6
Author: mingbo_pb <[email protected]>
AuthorDate: Tue Apr 9 15:41:42 2019 +0800
[SPARK-27406][SQL] UnsafeArrayData serialization breaks when two machines
have different Oops size
## What changes were proposed in this pull request?
ApproxCountDistinctForIntervals holds the UnsafeArrayData data to
initialize endpoints. When the UnsafeArrayData is serialized with Java
serialization, the BYTE_ARRAY_OFFSET in memory can change if two machines have
different pointer width (Oops in JVM).
This PR fixes this issue by using the same way in
https://github.com/apache/spark/pull/9030
## How was this patch tested?
Manual test has been done in our tpcds environment and regarding unit test
case has been added as well
Closes #24317 from pengbo/SPARK-27406.
Authored-by: mingbo_pb <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/catalyst/expressions/UnsafeArrayData.java | 39 +++++++++++++++++++++-
.../spark/sql/catalyst/util/UnsafeArraySuite.scala | 17 +++++++++-
2 files changed, 54 insertions(+), 2 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
index 04fc986..4ff0838 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
@@ -17,6 +17,10 @@
package org.apache.spark.sql.catalyst.expressions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
@@ -30,6 +34,8 @@ import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
+import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;
+
/**
* An Unsafe implementation of Array which is backed by raw memory instead of
Java objects.
*
@@ -52,7 +58,7 @@ import org.apache.spark.unsafe.types.UTF8String;
* Instances of `UnsafeArrayData` act as pointers to row data stored in this
format.
*/
-public final class UnsafeArrayData extends ArrayData {
+public final class UnsafeArrayData extends ArrayData implements Externalizable
{
public static int calculateHeaderPortionInBytes(int numFields) {
return (int)calculateHeaderPortionInBytes((long)numFields);
}
@@ -485,4 +491,35 @@ public final class UnsafeArrayData extends ArrayData {
public static UnsafeArrayData fromPrimitiveArray(double[] arr) {
return fromPrimitiveArray(arr, Platform.DOUBLE_ARRAY_OFFSET, arr.length,
8);
}
+
+
+ public byte[] getBytes() {
+ if (baseObject instanceof byte[]
+ && baseOffset == Platform.BYTE_ARRAY_OFFSET
+ && (((byte[]) baseObject).length == sizeInBytes)) {
+ return (byte[]) baseObject;
+ } else {
+ byte[] bytes = new byte[sizeInBytes];
+ Platform.copyMemory(baseObject, baseOffset, bytes,
Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
+ return bytes;
+ }
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ byte[] bytes = getBytes();
+ out.writeInt(bytes.length);
+ out.writeInt(this.numElements);
+ out.write(bytes);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
+ this.baseOffset = BYTE_ARRAY_OFFSET;
+ this.sizeInBytes = in.readInt();
+ this.numElements = in.readInt();
+ this.elementOffset = baseOffset +
calculateHeaderPortionInBytes(this.numElements);
+ this.baseObject = new byte[sizeInBytes];
+ in.readFully((byte[]) baseObject);
+ }
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
index 29a2bcd..db25d2f 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
@@ -19,11 +19,13 @@ package org.apache.spark.sql.catalyst.util
import java.time.ZoneId
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
class UnsafeArraySuite extends SparkFunSuite {
@@ -210,4 +212,17 @@ class UnsafeArraySuite extends SparkFunSuite {
val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
assert(doubleEncoder.toRow(doubleArray).getArray(0).toDoubleArray.sameElements(doubleArray))
}
+
+ test("unsafe java serialization") {
+ val offset = 32
+ val data = new Array[Byte](1024)
+ Platform.putLong(data, offset, 1)
+ val arrayData = new UnsafeArrayData()
+ arrayData.pointTo(data, offset, data.length)
+ arrayData.setLong(0, 19285)
+ val ser = new JavaSerializer(new SparkConf).newInstance()
+ val arrayDataSer =
ser.deserialize[UnsafeArrayData](ser.serialize(arrayData))
+ assert(arrayDataSer.getLong(0) == 19285)
+ assert(arrayDataSer.getBaseObject.asInstanceOf[Array[Byte]].length == 1024)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]