[
https://issues.apache.org/jira/browse/SPARK-1977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14053994#comment-14053994
]
Xiangrui Meng commented on SPARK-1977:
--------------------------------------
I think now I understand when it happens. We use storage level MEMORY_AND_DISK
for user/product in/out links, which contains BitSet objects. If the dataset is
large, these RDDs will be pushed from in memory storage to on disk storage,
where the latter requires serialization. So the easiest way to re-produce this
error is changing the storage level of inLinks/outLinks to DISK_ONLY and run
with kryo.
[~neville] Instead of mapping mutable.BitSet to immutable.BitSet, which
introduces overhead, we can register mutable.BitSet in our MovieLensALS example
code and wait for the next Chill release. Does it sound good to you?
> mutable.BitSet in ALS not serializable with KryoSerializer
> ----------------------------------------------------------
>
> Key: SPARK-1977
> URL: https://issues.apache.org/jira/browse/SPARK-1977
> Project: Spark
> Issue Type: Bug
> Components: MLlib
> Affects Versions: 1.0.0
> Reporter: Neville Li
> Priority: Minor
>
> OutLinkBlock in ALS.scala has an Array[mutable.BitSet] member.
> KryoSerializer uses AllScalaRegistrar from Twitter chill but it doesn't
> register mutable.BitSet.
> Right now we have to register mutable.BitSet manually. A proper fix would be
> using immutable.BitSet in ALS or register mutable.BitSet in upstream chill.
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 1724.0:9 failed 4 times, most recent failure: Exception failure in TID
> 68548 on host lon4-hadoopslave-b232.lon4.spotify.net:
> com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException:
> scala.collection.mutable.HashSet
> Serialization trace:
> shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)
>
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:115)
>
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:155)
>
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:154)
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:154)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>
> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
> org.apache.spark.scheduler.Task.run(Task.scala:51)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> java.lang.Thread.run(Thread.java:662)
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)