[
https://issues.apache.org/jira/browse/SPARK-4349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14228493#comment-14228493
]
Patrick Wendell commented on SPARK-4349:
----------------------------------------
Hey Matt,
It turns out that parallel collections are not the only RDD where our "sampled
pre-emptive serialization" trick can break. Other types of RDD's can have
discrepancies in the partitions such that some could serialize properly and
others don't. And I think those other cases are actually more serious than the
parallel collections RDD case because parallelize() is mostly used for
prototyping. I've seen the more general issue affect production workloads so
it would be good to fix. On top of this, we generally could stand to have
better error reporting for failed serialization cases - (related work
SPARK-3694).
In terms of solutions to this problem, it would be nice to find a solution that
works in the general case. Matt - did you look at all about how complicated it
would be to catch these errors are the time of serialization and propagate them
up correctly such that the task set is aborted? This would be the most general
and robust solution, although it could be complicated.
> Spark driver hangs on sc.parallelize() if exception is thrown during
> serialization
> ----------------------------------------------------------------------------------
>
> Key: SPARK-4349
> URL: https://issues.apache.org/jira/browse/SPARK-4349
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.1.0
> Reporter: Matt Cheah
> Fix For: 1.3.0
>
>
> Executing the following in the Spark Shell will lead to the Spark Shell
> hanging after a stack trace is printed. The serializer is set to the Kryo
> serializer.
> {code}
> scala> import com.esotericsoftware.kryo.io.Input
> import com.esotericsoftware.kryo.io.Input
> scala> import com.esotericsoftware.kryo.io.Output
> import com.esotericsoftware.kryo.io.Output
> scala> class MyKryoSerializable extends
> com.esotericsoftware.kryo.KryoSerializable { def write (kryo:
> com.esotericsoftware.kryo.Kryo, output: Output) { throw new
> com.esotericsoftware.kryo.KryoException; } ; def read (kryo:
> com.esotericsoftware.kryo.Kryo, input: Input) { throw new
> com.esotericsoftware.kryo.KryoException; } }
> defined class MyKryoSerializable
> scala> sc.parallelize(Seq(new MyKryoSerializable, new
> MyKryoSerializable)).collect
> {code}
> A stack trace is printed during serialization as expected, but another stack
> trace is printed afterwards, indicating that the driver can't recover:
> {code}
> 14/11/11 14:10:03 ERROR OneForOneStrategy: actor name [ExecutorActor] is not
> unique!
> akka.actor.PostRestartException: exception post restart (class
> java.io.IOException)
> at
> akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:249)
> at
> akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:247)
> at
> akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:302)
> at
> akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:297)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:247)
> at
> akka.actor.dungeon.FaultHandling$class.faultRecreate(FaultHandling.scala:76)
> at akka.actor.ActorCell.faultRecreate(ActorCell.scala:369)
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:459)
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.actor.InvalidActorNameException: actor name [ExecutorActor]
> is not unique!
> at
> akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
> at akka.actor.dungeon.Children$class.reserveChild(Children.scala:77)
> at akka.actor.ActorCell.reserveChild(ActorCell.scala:369)
> at akka.actor.dungeon.Children$class.makeChild(Children.scala:202)
> at akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
> at akka.actor.ActorCell.attachChild(ActorCell.scala:369)
> at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:552)
> at org.apache.spark.executor.Executor.<init>(Executor.scala:97)
> at
> org.apache.spark.scheduler.local.LocalActor.<init>(LocalBackend.scala:53)
> at
> org.apache.spark.scheduler.local.LocalBackend$$anonfun$start$1.apply(LocalBackend.scala:96)
> at
> org.apache.spark.scheduler.local.LocalBackend$$anonfun$start$1.apply(LocalBackend.scala:96)
> at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:343)
> at akka.actor.Props.newActor(Props.scala:252)
> at akka.actor.ActorCell.newActor(ActorCell.scala:552)
> at
> akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:234)
> ... 11 more
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]