Thanks. Do you mind playing with chill-scala a little bit and see if it
actually works well for closures? One way to try is to hard code the
serializer to use Kryo with chill-scala, and then run through all the unit
tests.

If it works well, we can incorporate that in the next release (probably not
1.0, but after that).


On Sun, May 4, 2014 at 9:08 PM, Soren Macbeth <so...@yieldbot.com> wrote:

> fwiw, it seems like it wouldn't be very difficult to integrate chill-scala,
> since you're already chill-java and probably get kryo serialization of
> closures and all sorts of other scala stuff for free. All that would be
> needed would be to include the dependency and then update KryoSerializer to
> register the stuff in chill-scala.
>
> In that case, you could probably safely make kryo the default serializer,
> which I think would be desirable in general.
>
>
> On Sun, May 4, 2014 at 8:48 PM, Reynold Xin <r...@databricks.com> wrote:
>
> > Good idea. I submitted a pull request for the doc update here:
> > https://github.com/apache/spark/pull/642
> >
> >
> > On Sun, May 4, 2014 at 3:54 PM, Soren Macbeth <so...@yieldbot.com>
> wrote:
> >
> > > Thanks for the reply!
> > >
> > > Ok, if that's the case, I'd recommend a note to that affect in the docs
> > at
> > > least.
> > >
> > > Just to give some more context here, I'm working on a Clojure DSL for
> > Spark
> > > called Flambo, which I plan to open source shortly. If I could I'd like
> > to
> > > focus on the initial bug that I hit.
> > >
> > > Exception in thread "main" org.apache.spark.SparkException: Job
> aborted:
> > > Exception while deserializing and fetching task:
> > > com.esotericsoftware.kryo.KryoException:
> > > java.lang.IllegalArgumentException: Can not set final
> > > scala.collection.convert.Wrappers field
> > > scala.collection.convert.Wrappers$SeqWrapper.$outer to
> > > clojure.lang.PersistentVector
> > > Serialization trace:
> > > $outer (scala.collection.convert.Wrappers$SeqWrapper)
> > >         at
> > >
> > >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
> > >         at
> > >
> > >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
> > >         at
> > >
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > >         at
> > > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > >         at org.apache.spark.scheduler.DAGScheduler.org
> > >
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
> > >         at
> > >
> > >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
> > >         at
> > >
> > >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
> > >         at scala.Option.foreach(Option.scala:236)
> > >         at
> > >
> > >
> >
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
> > >         at
> > >
> > >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
> > >         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> > >         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> > >         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> > >         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> > >         at
> > >
> > >
> >
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> > >         at
> > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > >         at
> > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > >         at
> > >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > >         at
> > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > >
> > > This happens immediately after all the tasks of a reduce stage complete
> > > successfully. Here is the function throwing the exception:
> > >
> > >
> > >
> >
> https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L43
> > >
> > > This is where I get lost. From googling around, it seems that scala is
> > > trying to wrap the result of my task, which contain
> > > clojure.lang.PersistentVector objects in a scala collection, but I
> don't
> > > know why it's doing that. I have a registered kryo serializer for
> > > clojure.lang.PersistentVector.
> > >
> > > based on this line is looks like it's trying to use the closure
> > serializer,
> > > yet the expection thrown is from
> com.esotericsoftware.kryo.KryoException:
> > >
> > >
> > >
> >
> https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L39
> > >
> > > Would storing my RDD as MEMORY_ONLY_SER prevent the closure serializer
> > from
> > > trying to deal with my clojure.lang.PeristentVector class?
> > >
> > > Where do I go from here?
> > >
> > >
> > > On Sun, May 4, 2014 at 12:50 PM, Reynold Xin <r...@databricks.com>
> > wrote:
> > >
> > > > I added the config option to use the non-default serializer. However,
> > at
> > > > the time, Kryo fails serializing pretty much any closures so that
> > option
> > > > was never really used / recommended.
> > > >
> > > > Since then the Scala ecosystem has developed, and some other projects
> > are
> > > > starting to use Kryo to serialize more Scala data structures, so I
> > > wouldn't
> > > > be surprised if there is a way to work around this now. However, I
> > don't
> > > > have enough time to look into it at this point. If you do, please do
> > post
> > > > your findings. Thanks.
> > > >
> > > >
> > > >
> > > > On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth <so...@yieldbot.com>
> > > wrote:
> > > >
> > > > > apologies for the cross-list posts, but I've gotten zero response
> in
> > > the
> > > > > user list and I guess this list is probably more appropriate.
> > > > >
> > > > > According to the documentation, using the KryoSerializer for
> closures
> > > is
> > > > > supported. However, when I try to set `spark.closure.serializer` to
> > > > > `org.apache.spark.serializer.KryoSerializer` thing fail pretty
> > > miserably.
> > > > >
> > > > > The first thing that happens it that is throws exceptions over and
> > over
> > > > > that it cannot locate my registrator class, which is located in my
> > > > assembly
> > > > > jar like so:
> > > > >
> > > > > 14/05/04 12:03:20 ERROR serializer.KryoSerializer: Failed to run
> > > > > spark.kryo.registrator
> > > > > java.lang.ClassNotFoundException: pickles.kryo.PicklesRegistrator
> > > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > > at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> > > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> > > > > at java.lang.Class.forName0(Native Method)
> > > > > at java.lang.Class.forName(Class.java:270)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:63)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:61)
> > > > > at scala.Option.foreach(Option.scala:236)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:61)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:116)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:79)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:180)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
> > > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > > at javax.security.auth.Subject.doAs(Subject.java:415)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
> > > > > at
> > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > > > > at java.lang.Thread.run(Thread.java:724)
> > > > >
> > > > > Now, I would expect it not to be able to find this class since it
> > > hasn't
> > > > > yet fetched my assembly jar to the executors. Once it does fetch my
> > > jar,
> > > > > those expections stop. Next, all the executor task die with the
> > > following
> > > > > exception:
> > > > >
> > > > > java.nio.ReadOnlyBufferException
> > > > > at java.nio.ByteBuffer.array(ByteBuffer.java:961)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:136)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
> > > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > > at javax.security.auth.Subject.doAs(Subject.java:415)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
> > > > > at
> > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > > > > at java.lang.Thread.run(Thread.java:724)
> > > > >
> > > > > AFAIK, I'm not doing anything out of the ordinary, just turning on
> > kryo
> > > > and
> > > > > using the registrator mechanism to register a couple custom
> > > serializers.
> > > > >
> > > > > The reason I tried turning on kryo for closure in the first place
> is
> > > > > because of a different bug that I was hitting during fetching and
> > > > > deserializing of tasks from my executors, which I detailed here:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> http://apache-spark-user-list.1001560.n3.nabble.com/Crazy-Kryo-Exception-td5257.html
> > > > >
> > > > > Here's hoping some on this list can help me track down what's
> > happening
> > > > as
> > > > > I didn't get a single reply on the user list.
> > > > >
> > > >
> > >
> >
>

Reply via email to