[ https://issues.apache.org/jira/browse/SPARK-4349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14228493#comment-14228493 ]
Patrick Wendell commented on SPARK-4349: ---------------------------------------- Hey Matt, It turns out that parallel collections are not the only RDD where our "sampled pre-emptive serialization" trick can break. Other types of RDD's can have discrepancies in the partitions such that some could serialize properly and others don't. And I think those other cases are actually more serious than the parallel collections RDD case because parallelize() is mostly used for prototyping. I've seen the more general issue affect production workloads so it would be good to fix. On top of this, we generally could stand to have better error reporting for failed serialization cases - (related work SPARK-3694). In terms of solutions to this problem, it would be nice to find a solution that works in the general case. Matt - did you look at all about how complicated it would be to catch these errors are the time of serialization and propagate them up correctly such that the task set is aborted? This would be the most general and robust solution, although it could be complicated. > Spark driver hangs on sc.parallelize() if exception is thrown during > serialization > ---------------------------------------------------------------------------------- > > Key: SPARK-4349 > URL: https://issues.apache.org/jira/browse/SPARK-4349 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.1.0 > Reporter: Matt Cheah > Fix For: 1.3.0 > > > Executing the following in the Spark Shell will lead to the Spark Shell > hanging after a stack trace is printed. The serializer is set to the Kryo > serializer. > {code} > scala> import com.esotericsoftware.kryo.io.Input > import com.esotericsoftware.kryo.io.Input > scala> import com.esotericsoftware.kryo.io.Output > import com.esotericsoftware.kryo.io.Output > scala> class MyKryoSerializable extends > com.esotericsoftware.kryo.KryoSerializable { def write (kryo: > com.esotericsoftware.kryo.Kryo, output: Output) { throw new > com.esotericsoftware.kryo.KryoException; } ; def read (kryo: > com.esotericsoftware.kryo.Kryo, input: Input) { throw new > com.esotericsoftware.kryo.KryoException; } } > defined class MyKryoSerializable > scala> sc.parallelize(Seq(new MyKryoSerializable, new > MyKryoSerializable)).collect > {code} > A stack trace is printed during serialization as expected, but another stack > trace is printed afterwards, indicating that the driver can't recover: > {code} > 14/11/11 14:10:03 ERROR OneForOneStrategy: actor name [ExecutorActor] is not > unique! > akka.actor.PostRestartException: exception post restart (class > java.io.IOException) > at > akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:249) > at > akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:247) > at > akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:302) > at > akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:297) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:247) > at > akka.actor.dungeon.FaultHandling$class.faultRecreate(FaultHandling.scala:76) > at akka.actor.ActorCell.faultRecreate(ActorCell.scala:369) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:459) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: akka.actor.InvalidActorNameException: actor name [ExecutorActor] > is not unique! > at > akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130) > at akka.actor.dungeon.Children$class.reserveChild(Children.scala:77) > at akka.actor.ActorCell.reserveChild(ActorCell.scala:369) > at akka.actor.dungeon.Children$class.makeChild(Children.scala:202) > at akka.actor.dungeon.Children$class.attachChild(Children.scala:42) > at akka.actor.ActorCell.attachChild(ActorCell.scala:369) > at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:552) > at org.apache.spark.executor.Executor.<init>(Executor.scala:97) > at > org.apache.spark.scheduler.local.LocalActor.<init>(LocalBackend.scala:53) > at > org.apache.spark.scheduler.local.LocalBackend$$anonfun$start$1.apply(LocalBackend.scala:96) > at > org.apache.spark.scheduler.local.LocalBackend$$anonfun$start$1.apply(LocalBackend.scala:96) > at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:343) > at akka.actor.Props.newActor(Props.scala:252) > at akka.actor.ActorCell.newActor(ActorCell.scala:552) > at > akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:234) > ... 11 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org