[ https://issues.apache.org/jira/browse/SPARK-4349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14207392#comment-14207392 ]
Matt Cheah commented on SPARK-4349: ----------------------------------- Investigation showed that the DAGScheduler may not catch un-serializable tasks, and the Task set manager assumes that serialization exceptions are caught in the DAGScheduler. What happens is in DAGScheduler.submitMissingTasks, a Seq of tasks is created and the first task in the set is proactively serialized to check for exceptions. However, in the case of parallel collection partitions and the code I provided above, the first task can be serialized since the first task's partition has an empty array for values, while other tasks in the array may have the actual data that cannot be serialized. I'm not sure what the best way to go forward is. Proactively serializing all of the tasks is too expensive. > Spark driver hangs on sc.parallelize() if exception is thrown during > serialization > ---------------------------------------------------------------------------------- > > Key: SPARK-4349 > URL: https://issues.apache.org/jira/browse/SPARK-4349 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.1.0 > Reporter: Matt Cheah > Fix For: 1.3.0 > > > Executing the following in the Spark Shell will lead to the Spark Shell > hanging after a stack trace is printed. The serializer is set to the Kryo > serializer. > {code} > scala> import com.esotericsoftware.kryo.io.Input > import com.esotericsoftware.kryo.io.Input > scala> import com.esotericsoftware.kryo.io.Output > import com.esotericsoftware.kryo.io.Output > scala> class MyKryoSerializable extends > com.esotericsoftware.kryo.KryoSerializable { def write (kryo: > com.esotericsoftware.kryo.Kryo, output: Output) { throw new > com.esotericsoftware.kryo.KryoException; } ; def read (kryo: > com.esotericsoftware.kryo.Kryo, input: Input) { throw new > com.esotericsoftware.kryo.KryoException; } } > defined class MyKryoSerializable > scala> sc.parallelize(Seq(new MyKryoSerializable, new > MyKryoSerializable)).collect > {code} > A stack trace is printed during serialization as expected, but another stack > trace is printed afterwards, indicating that the driver can't recover: > {code} > 14/11/11 14:10:03 ERROR OneForOneStrategy: actor name [ExecutorActor] is not > unique! > akka.actor.PostRestartException: exception post restart (class > java.io.IOException) > at > akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:249) > at > akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:247) > at > akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:302) > at > akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:297) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:247) > at > akka.actor.dungeon.FaultHandling$class.faultRecreate(FaultHandling.scala:76) > at akka.actor.ActorCell.faultRecreate(ActorCell.scala:369) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:459) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: akka.actor.InvalidActorNameException: actor name [ExecutorActor] > is not unique! > at > akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130) > at akka.actor.dungeon.Children$class.reserveChild(Children.scala:77) > at akka.actor.ActorCell.reserveChild(ActorCell.scala:369) > at akka.actor.dungeon.Children$class.makeChild(Children.scala:202) > at akka.actor.dungeon.Children$class.attachChild(Children.scala:42) > at akka.actor.ActorCell.attachChild(ActorCell.scala:369) > at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:552) > at org.apache.spark.executor.Executor.<init>(Executor.scala:97) > at > org.apache.spark.scheduler.local.LocalActor.<init>(LocalBackend.scala:53) > at > org.apache.spark.scheduler.local.LocalBackend$$anonfun$start$1.apply(LocalBackend.scala:96) > at > org.apache.spark.scheduler.local.LocalBackend$$anonfun$start$1.apply(LocalBackend.scala:96) > at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:343) > at akka.actor.Props.newActor(Props.scala:252) > at akka.actor.ActorCell.newActor(ActorCell.scala:552) > at > akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:234) > ... 11 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org