[
https://issues.apache.org/jira/browse/SPARK-4882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Josh Rosen updated SPARK-4882:
------------------------------
Description:
When KryoSerializer is used, PySpark will throw NullPointerException when
trying to send broadcast variables to workers. This issue does not occur when
the master is {{local}}, or when using the default JavaSerializer.
*Reproduction*:
Run
{code}
SPARK_LOCAL_IP=127.0.0.1 ./bin/pyspark --master local-cluster[2,2,512] --conf
spark.serializer=org.apache.spark.serializer.KryoSerializer
{code}
then run
{code}
b = sc.broadcast("hello")
sc.parallelize([0]).flatMap(lambda x: b.value).collect()
{code}
This job fails because all tasks throw the following exception:
{code}
14/12/28 14:26:08 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 8,
localhost): java.lang.NullPointerException
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:589)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:232)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:228)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:228)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:203)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:203)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1515)
at
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:202)
{code}
KryoSerializer may be enabled in the {{spark-defaults.conf}} file, so users may
hit this error and be confused.
*Workaround*:
Override the {{spark.serializer}} setting to use the default Java serializer.
was:
This issue plagued me weeks ago, and finally hit a point where I just had to
find a solution!
My spark-defaults.conf file had this property set
spark.serializer org.apache.spark.serializer.KryoSerializer
The following example IN LOCAL mode works fine
(from https://github.com/apache/spark/blob/master/python/pyspark/broadcast.py)
{code}
>>> from pyspark.context import SparkContext
>>> sc = SparkContext('local', 'test')
>>> b = sc.broadcast([1, 2, 3, 4, 5])
>>> b.value
[1, 2, 3, 4, 5]
>>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
>>> b.unpersist()
{code}
However, when I initialize the SparkContext pointing to my Mesos cluster,
I get the following stack trace
{code}
14/12/18 08:08:37 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0
(TID 3, 10.20.100.202, PROCESS_LOCAL, 1120 bytes)
14/12/18 08:08:46 INFO storage.BlockManagerMasterActor: Registering block
manager 10.20.100.202:55734 with 1060.3 MB RAM,
BlockManagerId(20141217-015001-1278350346-5050-28-3, 10.20.100.202, 55734)
14/12/18 08:08:47 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in
memory on 10.20.100.202:55734 (size: 6.3 KB, free: 1060.3 MB)
14/12/18 08:08:47 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in
memory on 10.20.100.202:55734 (size: 68.0 B, free: 1060.3 MB)
14/12/18 08:08:47 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 2.0
(TID 3, 10.20.100.202): java.lang.NullPointerException
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:589)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:232)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:228)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:228)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:203)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:203)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1459)
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:202)
{code}
I found out that local mode works fine rather painfully, since I had originally
been running Spark under Mesos, and was trying every which way to try to find
out why I was hitting an NPE.
Only when I found the local example did I make progress and eventually tracked
it down to the KryoSerializer configs.
When I commented out the `spark.serializer` configuration (and thus used the
default JavaSerializer), the broadcast finally works!
I don't even know if KryoSerializer is an appropriate setting for a pyspark
program (seems like no?).
Even so, who is to say that I wouldn't be running Java/Scala programs in tandem
(using the same spark-defaults file), which presumedly would want to benefit
from the KryoSerializer.
Albeit, a workaround seems to be to override the `spark.serializer` setting in
my pyspark code or change the defaults.
thanks,
Fi
Target Version/s: 1.0.3, 1.3.0, 1.2.1
Affects Version/s: 1.3.0
1.2.0
Assignee: Josh Rosen
Labels: (was: broadcast kryo npe pyspark serializers)
Summary: PySpark broadcast breaks when using KryoSerializer
(was: pyspark broadcast breaks if spark serializer configuration set to
KryoSerializer running under Mesos)
I've edited this issue's description to use a reproduction which does not
involve Mesos or changes to spark-defaults.conf, and to trim down the
description to its bare essentials (I hope you don't mind; this makes the
description easier to follow for reviewers, I think). I suspect that the bug
here involves code paths that aren't hit in {{local}} mode, which is why you
weren't able to reproduce this except on an actual cluster.
To address a question raised in the original description:
{quote}
I don't even know if KryoSerializer is an appropriate setting for a pyspark
program (seems like no?).
{quote}
If you're using MLlib or SparkSQL with PySpark, then Spark will sometimes need
to serialize Java objects rather than byte arrays, so in principle PySpark jobs
can benefit from KryoSerializer, too.
> PySpark broadcast breaks when using KryoSerializer
> --------------------------------------------------
>
> Key: SPARK-4882
> URL: https://issues.apache.org/jira/browse/SPARK-4882
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 1.1.1, 1.2.0, 1.3.0
> Reporter: Fi
> Assignee: Josh Rosen
>
> When KryoSerializer is used, PySpark will throw NullPointerException when
> trying to send broadcast variables to workers. This issue does not occur
> when the master is {{local}}, or when using the default JavaSerializer.
> *Reproduction*:
> Run
> {code}
> SPARK_LOCAL_IP=127.0.0.1 ./bin/pyspark --master local-cluster[2,2,512] --conf
> spark.serializer=org.apache.spark.serializer.KryoSerializer
> {code}
> then run
> {code}
> b = sc.broadcast("hello")
> sc.parallelize([0]).flatMap(lambda x: b.value).collect()
> {code}
> This job fails because all tasks throw the following exception:
> {code}
> 14/12/28 14:26:08 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 8,
> localhost): java.lang.NullPointerException
> at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:589)
> at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:232)
> at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:228)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:228)
> at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:203)
> at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:203)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1515)
> at
> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:202)
> {code}
> KryoSerializer may be enabled in the {{spark-defaults.conf}} file, so users
> may hit this error and be confused.
> *Workaround*:
> Override the {{spark.serializer}} setting to use the default Java serializer.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]