Antoine Amend created SPARK-3926:
------------------------------------

             Summary: result of JavaRDD collectAsMap() is not serializable
                 Key: SPARK-3926
                 URL: https://issues.apache.org/jira/browse/SPARK-3926
             Project: Spark
          Issue Type: Bug
          Components: Java API
    Affects Versions: 1.1.0
         Environment: CentOS / Spark 1.1 / Hadoop Hortonworks 2.4.0.2.1.2.0-402
            Reporter: Antoine Amend


Using the Java API, I want to collect the result of a RDD<String, String> as a 
HashMap using collectAsMap function:
Map<String, String> map = myJavaRDD.collectAsMap();
This works fine, but when passing this map to another function, such as...
myOtherJavaRDD.mapToPair(new CustomFunction(map))
...this leads to the following error:

Exception in thread "main" org.apache.spark.SparkException: Task not 
serializable

        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)

        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)

        at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)

        at org.apache.spark.rdd.RDD.map(RDD.scala:270)

        at 
org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:99)

        at org.apache.spark.api.java.JavaPairRDD.mapToPair(JavaPairRDD.scala:44)

        ../.. MY CLASS ../..

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:606)

        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)

        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)

        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: java.io.NotSerializableException: 
scala.collection.convert.Wrappers$MapWrapper

        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)

        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)

        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)

        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)

        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)

        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)

        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)

        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)

at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)

This seems to be due to WrapAsJava.scala being non serializable
../..
  implicit def mapAsJavaMap[A, B](m: Map[A, B]): ju.Map[A, B] = m match {
    //case JConcurrentMapWrapper(wrapped) => wrapped
    case JMapWrapper(wrapped) => wrapped.asInstanceOf[ju.Map[A, B]]
    case _ => new MapWrapper(m)
  }
../..

The workaround is to manually wrapper this map into another one (serialized)
Map<String, String> map = myJavaRDD.collectAsMap();
Map<String, String> tmp = new HashMap<String, String>(map);
myOtherJavaRDD.mapToPair(new CustomFunction(tmp))




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to