I added the config option to use the non-default serializer. However, at the time, Kryo fails serializing pretty much any closures so that option was never really used / recommended.
Since then the Scala ecosystem has developed, and some other projects are starting to use Kryo to serialize more Scala data structures, so I wouldn't be surprised if there is a way to work around this now. However, I don't have enough time to look into it at this point. If you do, please do post your findings. Thanks. On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth <so...@yieldbot.com> wrote: > apologies for the cross-list posts, but I've gotten zero response in the > user list and I guess this list is probably more appropriate. > > According to the documentation, using the KryoSerializer for closures is > supported. However, when I try to set `spark.closure.serializer` to > `org.apache.spark.serializer.KryoSerializer` thing fail pretty miserably. > > The first thing that happens it that is throws exceptions over and over > that it cannot locate my registrator class, which is located in my assembly > jar like so: > > 14/05/04 12:03:20 ERROR serializer.KryoSerializer: Failed to run > spark.kryo.registrator > java.lang.ClassNotFoundException: pickles.kryo.PicklesRegistrator > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > > org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:63) > at > > org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:61) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:61) > at > > org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:116) > at > > org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:79) > at > > org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:180) > at > > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) > at > > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:415) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438) > at > org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > > Now, I would expect it not to be able to find this class since it hasn't > yet fetched my assembly jar to the executors. Once it does fetch my jar, > those expections stop. Next, all the executor task die with the following > exception: > > java.nio.ReadOnlyBufferException > at java.nio.ByteBuffer.array(ByteBuffer.java:961) > at > > org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:136) > at > > org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193) > at > > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) > at > > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:415) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438) > at > org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > > AFAIK, I'm not doing anything out of the ordinary, just turning on kryo and > using the registrator mechanism to register a couple custom serializers. > > The reason I tried turning on kryo for closure in the first place is > because of a different bug that I was hitting during fetching and > deserializing of tasks from my executors, which I detailed here: > > > http://apache-spark-user-list.1001560.n3.nabble.com/Crazy-Kryo-Exception-td5257.html > > Here's hoping some on this list can help me track down what's happening as > I didn't get a single reply on the user list. >