[ 
https://issues.apache.org/jira/browse/SPARK-4349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14207392#comment-14207392
 ] 

Matt Cheah commented on SPARK-4349:
-----------------------------------

Investigation showed that the DAGScheduler may not catch un-serializable tasks, 
and the Task set manager assumes that serialization exceptions are caught in 
the DAGScheduler.

What happens is in DAGScheduler.submitMissingTasks, a Seq of tasks is created 
and the first task in the set is proactively serialized to check for 
exceptions. However, in the case of parallel collection partitions and the code 
I provided above, the first task can be serialized since the first task's 
partition has an empty array for values, while other tasks in the array may 
have the actual data that cannot be serialized.

I'm not sure what the best way to go forward is. Proactively serializing all of 
the tasks is too expensive.

> Spark driver hangs on sc.parallelize() if exception is thrown during 
> serialization
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-4349
>                 URL: https://issues.apache.org/jira/browse/SPARK-4349
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.1.0
>            Reporter: Matt Cheah
>             Fix For: 1.3.0
>
>
> Executing the following in the Spark Shell will lead to the Spark Shell 
> hanging after a stack trace is printed. The serializer is set to the Kryo 
> serializer.
> {code}
> scala> import com.esotericsoftware.kryo.io.Input
> import com.esotericsoftware.kryo.io.Input
> scala> import com.esotericsoftware.kryo.io.Output
> import com.esotericsoftware.kryo.io.Output
> scala> class MyKryoSerializable extends 
> com.esotericsoftware.kryo.KryoSerializable { def write (kryo: 
> com.esotericsoftware.kryo.Kryo, output: Output) { throw new 
> com.esotericsoftware.kryo.KryoException; } ; def read (kryo: 
> com.esotericsoftware.kryo.Kryo, input: Input) { throw new 
> com.esotericsoftware.kryo.KryoException; } }
> defined class MyKryoSerializable
> scala> sc.parallelize(Seq(new MyKryoSerializable, new 
> MyKryoSerializable)).collect
> {code}
> A stack trace is printed during serialization as expected, but another stack 
> trace is printed afterwards, indicating that the driver can't recover:
> {code}
> 14/11/11 14:10:03 ERROR OneForOneStrategy: actor name [ExecutorActor] is not 
> unique!
> akka.actor.PostRestartException: exception post restart (class 
> java.io.IOException)
>       at 
> akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:249)
>       at 
> akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:247)
>       at 
> akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:302)
>       at 
> akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:297)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>       at 
> akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:247)
>       at 
> akka.actor.dungeon.FaultHandling$class.faultRecreate(FaultHandling.scala:76)
>       at akka.actor.ActorCell.faultRecreate(ActorCell.scala:369)
>       at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:459)
>       at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>       at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.actor.InvalidActorNameException: actor name [ExecutorActor] 
> is not unique!
>       at 
> akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
>       at akka.actor.dungeon.Children$class.reserveChild(Children.scala:77)
>       at akka.actor.ActorCell.reserveChild(ActorCell.scala:369)
>       at akka.actor.dungeon.Children$class.makeChild(Children.scala:202)
>       at akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
>       at akka.actor.ActorCell.attachChild(ActorCell.scala:369)
>       at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:552)
>       at org.apache.spark.executor.Executor.<init>(Executor.scala:97)
>       at 
> org.apache.spark.scheduler.local.LocalActor.<init>(LocalBackend.scala:53)
>       at 
> org.apache.spark.scheduler.local.LocalBackend$$anonfun$start$1.apply(LocalBackend.scala:96)
>       at 
> org.apache.spark.scheduler.local.LocalBackend$$anonfun$start$1.apply(LocalBackend.scala:96)
>       at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:343)
>       at akka.actor.Props.newActor(Props.scala:252)
>       at akka.actor.ActorCell.newActor(ActorCell.scala:552)
>       at 
> akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:234)
>       ... 11 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to