[
https://issues.apache.org/jira/browse/SPARK-4349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14230277#comment-14230277
]
Matt Cheah commented on SPARK-4349:
-----------------------------------
I agree that's a better solution. I fell back to this because I didn't want to
change the core architecture and assumed this is the "correct" way to do
things, based on code comments. I'm all for catching the serialization errors
deeper down in the stack. I can investigate if this is feasible now.
> Spark driver hangs on sc.parallelize() if exception is thrown during
> serialization
> ----------------------------------------------------------------------------------
>
> Key: SPARK-4349
> URL: https://issues.apache.org/jira/browse/SPARK-4349
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.1.0
> Reporter: Matt Cheah
> Fix For: 1.3.0
>
>
> Executing the following in the Spark Shell will lead to the Spark Shell
> hanging after a stack trace is printed. The serializer is set to the Kryo
> serializer.
> {code}
> scala> import com.esotericsoftware.kryo.io.Input
> import com.esotericsoftware.kryo.io.Input
> scala> import com.esotericsoftware.kryo.io.Output
> import com.esotericsoftware.kryo.io.Output
> scala> class MyKryoSerializable extends
> com.esotericsoftware.kryo.KryoSerializable { def write (kryo:
> com.esotericsoftware.kryo.Kryo, output: Output) { throw new
> com.esotericsoftware.kryo.KryoException; } ; def read (kryo:
> com.esotericsoftware.kryo.Kryo, input: Input) { throw new
> com.esotericsoftware.kryo.KryoException; } }
> defined class MyKryoSerializable
> scala> sc.parallelize(Seq(new MyKryoSerializable, new
> MyKryoSerializable)).collect
> {code}
> A stack trace is printed during serialization as expected, but another stack
> trace is printed afterwards, indicating that the driver can't recover:
> {code}
> 14/11/11 14:10:03 ERROR OneForOneStrategy: actor name [ExecutorActor] is not
> unique!
> akka.actor.PostRestartException: exception post restart (class
> java.io.IOException)
> at
> akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:249)
> at
> akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:247)
> at
> akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:302)
> at
> akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:297)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:247)
> at
> akka.actor.dungeon.FaultHandling$class.faultRecreate(FaultHandling.scala:76)
> at akka.actor.ActorCell.faultRecreate(ActorCell.scala:369)
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:459)
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.actor.InvalidActorNameException: actor name [ExecutorActor]
> is not unique!
> at
> akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
> at akka.actor.dungeon.Children$class.reserveChild(Children.scala:77)
> at akka.actor.ActorCell.reserveChild(ActorCell.scala:369)
> at akka.actor.dungeon.Children$class.makeChild(Children.scala:202)
> at akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
> at akka.actor.ActorCell.attachChild(ActorCell.scala:369)
> at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:552)
> at org.apache.spark.executor.Executor.<init>(Executor.scala:97)
> at
> org.apache.spark.scheduler.local.LocalActor.<init>(LocalBackend.scala:53)
> at
> org.apache.spark.scheduler.local.LocalBackend$$anonfun$start$1.apply(LocalBackend.scala:96)
> at
> org.apache.spark.scheduler.local.LocalBackend$$anonfun$start$1.apply(LocalBackend.scala:96)
> at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:343)
> at akka.actor.Props.newActor(Props.scala:252)
> at akka.actor.ActorCell.newActor(ActorCell.scala:552)
> at
> akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:234)
> ... 11 more
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]