[
https://issues.apache.org/jira/browse/SPARK-3070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14105536#comment-14105536
]
Daniel Darabos commented on SPARK-3070:
---------------------------------------
I think this is almost certainly a duplicate of
https://issues.apache.org/jira/browse/SPARK-2878. Which is FIXED, thanks to
Graham Dennis! Can you please check the repro against the fixed code to see if
this can be closed? Thanks :).
> Kryo deserialization without using the custom registrator
> ---------------------------------------------------------
>
> Key: SPARK-3070
> URL: https://issues.apache.org/jira/browse/SPARK-3070
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.0.2
> Reporter: Andras Nemeth
>
> If an RDD partition is cached on executor1 and used by a task on executor2
> then the partition needs to be serialized and sent over. For this particular
> serialization/deserialization usecase, when using kry, it appears that the
> custom registrator will not be used on the deserialization side. This of
> course results in some totally misleading kry deserialization errors.
> The cause for this behavior seems to be that the thread running this
> deserialization has a classloader which does not have the jars specified in
> the SparkConf on its classpath. So it fails to load the Registrator with a
> ClassNotFoundException, but it catches the exception and happily continues
> without a registrator. (A bug on its own right in my opinion.)
> To reproduce, have two rdds partitioned the same way (as in with the same
> partitioner) but corresponding partitions cached on different machines, then
> join them. See below a somewhat convoluted way to achieve this. If you run
> the below program on a spark cluster with two workers, each with one core,
> you will be able to trigger the bug. Basically it runs two counts in
> parallel, which ensures that the two RDDs will be computed in parallel, and
> as a consequence on different executors.
> {code:java}
> import com.esotericsoftware.kryo.Kryo
> import org.apache.spark.HashPartitioner
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkContext._
> import org.apache.spark.rdd.RDD
> import org.apache.spark.serializer.KryoRegistrator
> import scala.actors.Actor
> case class MyClass(a: Int)
> class MyKryoRegistrator extends KryoRegistrator {
> override def registerClasses(kryo: Kryo) {
> kryo.register(classOf[MyClass])
> }
> }
> class CountActor(rdd: RDD[_]) extends Actor {
> def act() {
> println("Start count")
> println(rdd.count)
> println("Stop count")
> }
> }
> object KryBugExample {
> def main(args: Array[String]) {
> val sparkConf = new SparkConf()
> .setMaster(args(0))
> .setAppName("KryBugExample")
> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
> .set("spark.kryo.registrator", "MyKryoRegistrator")
> .setJars(Seq("target/scala-2.10/krybugexample_2.10-0.1-SNAPSHOT.jar"))
> val sc = new SparkContext(sparkConf)
> val partitioner = new HashPartitioner(1)
> val rdd1 = sc
> .parallelize((0 until 100000).map(i => (i, MyClass(i))), 1)
> .partitionBy(partitioner).cache
> val rdd2 = sc
> .parallelize((0 until 100000).map(i => (i, MyClass(i * 2))), 1)
> .partitionBy(partitioner).cache
> new CountActor(rdd1).start
> new CountActor(rdd2).start
> println(rdd1.join(rdd2).count)
> while (true) {}
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]