I added the config option to use the non-default serializer. However, at
the time, Kryo fails serializing pretty much any closures so that option
was never really used / recommended.

Since then the Scala ecosystem has developed, and some other projects are
starting to use Kryo to serialize more Scala data structures, so I wouldn't
be surprised if there is a way to work around this now. However, I don't
have enough time to look into it at this point. If you do, please do post
your findings. Thanks.



On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth <so...@yieldbot.com> wrote:

> apologies for the cross-list posts, but I've gotten zero response in the
> user list and I guess this list is probably more appropriate.
>
> According to the documentation, using the KryoSerializer for closures is
> supported. However, when I try to set `spark.closure.serializer` to
> `org.apache.spark.serializer.KryoSerializer` thing fail pretty miserably.
>
> The first thing that happens it that is throws exceptions over and over
> that it cannot locate my registrator class, which is located in my assembly
> jar like so:
>
> 14/05/04 12:03:20 ERROR serializer.KryoSerializer: Failed to run
> spark.kryo.registrator
> java.lang.ClassNotFoundException: pickles.kryo.PicklesRegistrator
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:270)
> at
>
> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:63)
> at
>
> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:61)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:61)
> at
>
> org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:116)
> at
>
> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:79)
> at
>
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:180)
> at
>
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
> at
>
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:724)
>
> Now, I would expect it not to be able to find this class since it hasn't
> yet fetched my assembly jar to the executors. Once it does fetch my jar,
> those expections stop. Next, all the executor task die with the following
> exception:
>
> java.nio.ReadOnlyBufferException
> at java.nio.ByteBuffer.array(ByteBuffer.java:961)
> at
>
> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:136)
> at
>
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
> at
>
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
> at
>
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:724)
>
> AFAIK, I'm not doing anything out of the ordinary, just turning on kryo and
> using the registrator mechanism to register a couple custom serializers.
>
> The reason I tried turning on kryo for closure in the first place is
> because of a different bug that I was hitting during fetching and
> deserializing of tasks from my executors, which I detailed here:
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Crazy-Kryo-Exception-td5257.html
>
> Here's hoping some on this list can help me track down what's happening as
> I didn't get a single reply on the user list.
>

Reply via email to