Repository: spark
Updated Branches:
refs/heads/master 4482ff23a -> ecf437a64
[SPARK-21534][SQL][PYSPARK] PickleException when creating dataframe from python
row with empty bytearray
## What changes were proposed in this pull request?
`PickleException` is thrown when creating dataframe from python row with empty
bytearray
spark.createDataFrame(spark.sql("select unhex('') as xx").rdd.map(lambda x:
{"abc": x.xx})).show()
net.razorvine.pickle.PickleException: invalid pickle data for bytearray;
expected 1 or 2 args, got 0
at
net.razorvine.pickle.objects.ByteArrayConstructor.construct(ByteArrayConstructor.java
...
`ByteArrayConstructor` doesn't deal with empty byte array pickled by Python3.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <[email protected]>
Closes #19085 from viirya/SPARK-21534.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ecf437a6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ecf437a6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ecf437a6
Branch: refs/heads/master
Commit: ecf437a64874a31328f4e28c6b24f37557fbe07d
Parents: 4482ff2
Author: Liang-Chi Hsieh <[email protected]>
Authored: Thu Aug 31 12:55:38 2017 +0900
Committer: hyukjinkwon <[email protected]>
Committed: Thu Aug 31 12:55:38 2017 +0900
----------------------------------------------------------------------
.../scala/org/apache/spark/api/python/SerDeUtil.scala | 14 ++++++++++++++
python/pyspark/sql/tests.py | 4 +++-
2 files changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ecf437a6/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index aaf8e7a..01e64b6 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -35,6 +35,16 @@ import org.apache.spark.rdd.RDD
/** Utilities for serialization / deserialization between Python and Java,
using Pickle. */
private[spark] object SerDeUtil extends Logging {
+ class ByteArrayConstructor extends
net.razorvine.pickle.objects.ByteArrayConstructor {
+ override def construct(args: Array[Object]): Object = {
+ // Deal with an empty byte array pickled by Python 3.
+ if (args.length == 0) {
+ Array.emptyByteArray
+ } else {
+ super.construct(args)
+ }
+ }
+ }
// Unpickle array.array generated by Python 2.6
class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor
{
// /* Description of types */
@@ -108,6 +118,10 @@ private[spark] object SerDeUtil extends Logging {
synchronized{
if (!initialized) {
Unpickler.registerConstructor("array", "array", new ArrayConstructor())
+ Unpickler.registerConstructor("__builtin__", "bytearray", new
ByteArrayConstructor())
+ Unpickler.registerConstructor("builtins", "bytearray", new
ByteArrayConstructor())
+ Unpickler.registerConstructor("__builtin__", "bytes", new
ByteArrayConstructor())
+ Unpickler.registerConstructor("_codecs", "encode", new
ByteArrayConstructor())
initialized = true
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ecf437a6/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 1ecde68..b310285 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -2383,9 +2383,11 @@ class SQLTests(ReusedPySparkTestCase):
def test_BinaryType_serialization(self):
# Pyrolite version <= 4.9 could not serialize BinaryType with Python3
SPARK-17808
+ # The empty bytearray is test for SPARK-21534.
schema = StructType([StructField('mybytes', BinaryType())])
data = [[bytearray(b'here is my data')],
- [bytearray(b'and here is some more')]]
+ [bytearray(b'and here is some more')],
+ [bytearray(b'')]]
df = self.spark.createDataFrame(data, schema=schema)
df.collect()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]