Repository: spark
Updated Branches:
  refs/heads/branch-1.2 1af7ca15f -> 6f47114d9


[SPARK-5361]Multiple Java RDD <-> Python RDD conversions not working correctly

This is found through reading RDD from `sc.newAPIHadoopRDD` and writing it back 
using `rdd.saveAsNewAPIHadoopFile` in pyspark.

It turns out that whenever there are multiple RDD conversions from JavaRDD to 
PythonRDD then back to JavaRDD, the exception below happens:

```
15/01/16 10:28:31 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 7)
java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to 
java.util.ArrayList
        at 
org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:157)
        at 
org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:153)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
```

The test case code below reproduces it:

```
from pyspark.rdd import RDD

dl = [
    (u'2', {u'director': u'David Lean'}),
    (u'7', {u'director': u'Andrew Dominik'})
]

dl_rdd = sc.parallelize(dl)
tmp = dl_rdd._to_java_object_rdd()
tmp2 = sc._jvm.SerDe.javaToPython(tmp)
t = RDD(tmp2, sc)
t.count()

tmp = t._to_java_object_rdd()
tmp2 = sc._jvm.SerDe.javaToPython(tmp)
t = RDD(tmp2, sc)
t.count() # it blows up here during the 2nd time of conversion
```

Author: Winston Chen <[email protected]>

Closes #4146 from wingchen/master and squashes the following commits:

903df7d [Winston Chen] SPARK-5361, update to toSeq based on the PR
5d90a83 [Winston Chen] SPARK-5361, make python pretty, so to pass PEP 8 checks
126be6b [Winston Chen] SPARK-5361, add in test case
4cf1187 [Winston Chen] SPARK-5361, add in test case
9f1a097 [Winston Chen] add in tuple handling while converting form python RDD 
back to JavaRDD

(cherry picked from commit 453d7999b88be87bda30d9e73038eb484ee063bd)
Signed-off-by: Josh Rosen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f47114d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f47114d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f47114d

Branch: refs/heads/branch-1.2
Commit: 6f47114d97347d52cc24d46364913a5c9125bb93
Parents: 1af7ca1
Author: Winston Chen <[email protected]>
Authored: Wed Jan 28 11:08:44 2015 -0800
Committer: Josh Rosen <[email protected]>
Committed: Mon Feb 16 16:51:26 2015 -0800

----------------------------------------------------------------------
 .../spark/api/python/PythonHadoopUtil.scala      |  5 +++++
 .../org/apache/spark/api/python/SerDeUtil.scala  |  5 ++++-
 python/pyspark/tests.py                          | 19 +++++++++++++++++++
 3 files changed, 28 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6f47114d/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index 5ba6617..c9181a2 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -138,6 +138,11 @@ private[python] class JavaToWritableConverter extends 
Converter[Any, Writable] {
           mapWritable.put(convertToWritable(k), convertToWritable(v))
         }
         mapWritable
+      case array: Array[Any] => {
+        val arrayWriteable = new ArrayWritable(classOf[Writable])
+        arrayWriteable.set(array.map(convertToWritable(_)))
+        arrayWriteable
+      }
       case other => throw new SparkException(
         s"Data of type ${other.getClass.getName} cannot be used")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/6f47114d/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala 
b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 269cdca..fb52a96 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -153,7 +153,10 @@ private[spark] object SerDeUtil extends Logging {
       iter.flatMap { row =>
         val obj = unpickle.loads(row)
         if (batched) {
-          obj.asInstanceOf[JArrayList[_]].asScala
+          obj match {
+            case array: Array[Any] => array.toSeq
+            case _ => obj.asInstanceOf[JArrayList[_]].asScala
+          }
         } else {
           Seq(obj)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/6f47114d/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index bca52a7..1349384 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -714,6 +714,25 @@ class RDDTests(ReusedPySparkTestCase):
         wr_s21 = rdd.sample(True, 0.4, 21).collect()
         self.assertNotEqual(set(wr_s11), set(wr_s21))
 
+    def test_multiple_python_java_RDD_conversions(self):
+        # Regression test for SPARK-5361
+        data = [
+            (u'1', {u'director': u'David Lean'}),
+            (u'2', {u'director': u'Andrew Dominik'})
+        ]
+        from pyspark.rdd import RDD
+        data_rdd = self.sc.parallelize(data)
+        data_java_rdd = data_rdd._to_java_object_rdd()
+        data_python_rdd = self.sc._jvm.SerDe.javaToPython(data_java_rdd)
+        converted_rdd = RDD(data_python_rdd, self.sc)
+        self.assertEqual(2, converted_rdd.count())
+
+        # conversion between python and java RDD threw exceptions
+        data_java_rdd = converted_rdd._to_java_object_rdd()
+        data_python_rdd = self.sc._jvm.SerDe.javaToPython(data_java_rdd)
+        converted_rdd = RDD(data_python_rdd, self.sc)
+        self.assertEqual(2, converted_rdd.count())
+
 
 class ProfilerTests(PySparkTestCase):
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to