Repository: spark
Updated Branches:
refs/heads/master 0b35fcd7f -> 453d7999b
[SPARK-5361]Multiple Java RDD <-> Python RDD conversions not working correctly
This is found through reading RDD from `sc.newAPIHadoopRDD` and writing it back
using `rdd.saveAsNewAPIHadoopFile` in pyspark.
It turns out that whenever there are multiple RDD conversions from JavaRDD to
PythonRDD then back to JavaRDD, the exception below happens:
```
15/01/16 10:28:31 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 7)
java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to
java.util.ArrayList
at
org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:157)
at
org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:153)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
```
The test case code below reproduces it:
```
from pyspark.rdd import RDD
dl = [
(u'2', {u'director': u'David Lean'}),
(u'7', {u'director': u'Andrew Dominik'})
]
dl_rdd = sc.parallelize(dl)
tmp = dl_rdd._to_java_object_rdd()
tmp2 = sc._jvm.SerDe.javaToPython(tmp)
t = RDD(tmp2, sc)
t.count()
tmp = t._to_java_object_rdd()
tmp2 = sc._jvm.SerDe.javaToPython(tmp)
t = RDD(tmp2, sc)
t.count() # it blows up here during the 2nd time of conversion
```
Author: Winston Chen <[email protected]>
Closes #4146 from wingchen/master and squashes the following commits:
903df7d [Winston Chen] SPARK-5361, update to toSeq based on the PR
5d90a83 [Winston Chen] SPARK-5361, make python pretty, so to pass PEP 8 checks
126be6b [Winston Chen] SPARK-5361, add in test case
4cf1187 [Winston Chen] SPARK-5361, add in test case
9f1a097 [Winston Chen] add in tuple handling while converting form python RDD
back to JavaRDD
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/453d7999
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/453d7999
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/453d7999
Branch: refs/heads/master
Commit: 453d7999b88be87bda30d9e73038eb484ee063bd
Parents: 0b35fcd
Author: Winston Chen <[email protected]>
Authored: Wed Jan 28 11:08:44 2015 -0800
Committer: Josh Rosen <[email protected]>
Committed: Wed Jan 28 11:08:44 2015 -0800
----------------------------------------------------------------------
.../spark/api/python/PythonHadoopUtil.scala | 5 +++++
.../org/apache/spark/api/python/SerDeUtil.scala | 5 ++++-
python/pyspark/tests.py | 19 +++++++++++++++++++
3 files changed, 28 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/453d7999/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index 5ba6617..c9181a2 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -138,6 +138,11 @@ private[python] class JavaToWritableConverter extends
Converter[Any, Writable] {
mapWritable.put(convertToWritable(k), convertToWritable(v))
}
mapWritable
+ case array: Array[Any] => {
+ val arrayWriteable = new ArrayWritable(classOf[Writable])
+ arrayWriteable.set(array.map(convertToWritable(_)))
+ arrayWriteable
+ }
case other => throw new SparkException(
s"Data of type ${other.getClass.getName} cannot be used")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/453d7999/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index a4153aa..19ca2bb 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -153,7 +153,10 @@ private[spark] object SerDeUtil extends Logging {
iter.flatMap { row =>
val obj = unpickle.loads(row)
if (batched) {
- obj.asInstanceOf[JArrayList[_]].asScala
+ obj match {
+ case array: Array[Any] => array.toSeq
+ case _ => obj.asInstanceOf[JArrayList[_]].asScala
+ }
} else {
Seq(obj)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/453d7999/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index e8e207a..e694ffc 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -714,6 +714,25 @@ class RDDTests(ReusedPySparkTestCase):
wr_s21 = rdd.sample(True, 0.4, 21).collect()
self.assertNotEqual(set(wr_s11), set(wr_s21))
+ def test_multiple_python_java_RDD_conversions(self):
+ # Regression test for SPARK-5361
+ data = [
+ (u'1', {u'director': u'David Lean'}),
+ (u'2', {u'director': u'Andrew Dominik'})
+ ]
+ from pyspark.rdd import RDD
+ data_rdd = self.sc.parallelize(data)
+ data_java_rdd = data_rdd._to_java_object_rdd()
+ data_python_rdd = self.sc._jvm.SerDe.javaToPython(data_java_rdd)
+ converted_rdd = RDD(data_python_rdd, self.sc)
+ self.assertEqual(2, converted_rdd.count())
+
+ # conversion between python and java RDD threw exceptions
+ data_java_rdd = converted_rdd._to_java_object_rdd()
+ data_python_rdd = self.sc._jvm.SerDe.javaToPython(data_java_rdd)
+ converted_rdd = RDD(data_python_rdd, self.sc)
+ self.assertEqual(2, converted_rdd.count())
+
class ProfilerTests(PySparkTestCase):
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]