[
https://issues.apache.org/jira/browse/SPARK-755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14212460#comment-14212460
]
Evan Sparks commented on SPARK-755:
-----------------------------------
Hmm... the issue was that Kryo worked well on small objects but when those same
objects got bigger, it crashed. The JIRA you link to looks more like classpath
issues with different versions of a custom serializer. This seems more like a
kryo configuration issue or something. (Similar to something like an akka max
frame size).
At any rate - I haven't tried to run this particular example in some time, so
if someone wants to to try and recreate with an
RDD[Tuple3(String,Float,Array[Float])] where all arrays are size 160,000 and
there are 1.2m elements in the RDD, be my guest, otherwise feel free to close
the ticket and we can keep our fingers crossed that this isn't in spark 1.0+.
> Kryo serialization failing - MLbase
> -----------------------------------
>
> Key: SPARK-755
> URL: https://issues.apache.org/jira/browse/SPARK-755
> Project: Spark
> Issue Type: Bug
> Components: Block Manager, Spark Core
> Affects Versions: 0.8.0
> Reporter: Evan Sparks
>
> When I turn on Kryo serialization, I get the following error as I increase
> the size of my input dataset. (From ~10GB to ~100GB). This issue does not
> manifest itself when I turn kryo off.
> I have code that successfully reads files, parses them into an
> {noformat}RDD[(String,Vector)]{noformat}, which can then be .count()'ed. I
> then run a .flatMap on these, with a function that has the following
> signature:
> {code}
> def expandData(x: (String, Vector)): Seq[(String, Float, Vector)]
> {code}
> And running a .count() on that RDD crashes - stack trace of failed task looks
> like this:
> {noformat}
> 13/05/31 00:16:53 INFO cluster.TaskSetManager: Finished TID 2024 in 23594 ms
> (progress: 10/1000)
> 13/05/31 00:16:53 INFO scheduler.DAGScheduler: Completed ResultTask(3, 24)
> 13/05/31 00:16:53 INFO cluster.ClusterScheduler:
> parentName:,name:TaskSet_3,runningTasks:151
> 13/05/31 00:16:53 INFO cluster.TaskSetManager: Starting task 3.0:175 as TID
> 2161 on slave 14: ip-10-62-199-77.ec2.internal:40850 (NODE_LOCAL)
> 13/05/31 00:16:53 INFO cluster.TaskSetManager: Serialized task 3.0:175 as
> 2832 bytes in 0 ms
> 13/05/31 00:16:53 INFO cluster.TaskSetManager: Lost TID 2053 (task 3.0:49)
> 13/05/31 00:16:53 INFO cluster.TaskSetManager: Loss was due to
> com.esotericsoftware.kryo.KryoException
> com.esotericsoftware.kryo.KryoException:
> java.lang.ArrayIndexOutOfBoundsException
> Serialization trace:
> elements (org.mlbase.Vector)
> _3 (scala.Tuple3)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:504)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:571)
> at spark.KryoSerializationStream.writeObject(KryoSerializer.scala:26)
> at
> spark.serializer.SerializationStream$class.writeAll(Serializer.scala:63)
> at spark.KryoSerializationStream.writeAll(KryoSerializer.scala:21)
> at spark.storage.BlockManager.dataSerialize(BlockManager.scala:910)
> at spark.storage.MemoryStore.putValues(MemoryStore.scala:61)
> at spark.storage.BlockManager.liftedTree1$1(BlockManager.scala:584)
> at spark.storage.BlockManager.put(BlockManager.scala:580)
> at spark.CacheManager.getOrCompute(CacheManager.scala:55)
> at spark.RDD.iterator(RDD.scala:207)
> at spark.scheduler.ResultTask.run(ResultTask.scala:84)
> at spark.executor.Executor$TaskRunner.run(Executor.scala:104)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:679)
> 13/05/31 00:16:53 INFO cluster.ClusterScheduler:
> parentName:,name:TaskSet_3,runningTasks:151
> 13/05/31 00:16:53 INFO cluster.TaskSetManager: Starting task 3.0:49 as TID
> 2162 on slave 12: ip-10-11-46-255.ec2.internal:38878 (NODE_LOCAL)
> 13/05/31 00:16:53 INFO cluster.TaskSetManager: Serialized task 3.0:49 as 2832
> bytes in 0 ms
> 13/05/31 00:16:53 INFO cluster.ClusterScheduler:
> parentName:,name:TaskSet_3,runningTasks:152
> 13/05/31 00:16:54 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
> Added rdd_7_257 in mem
> {noformat}
> My Kryo Registrator looks like this:
> {code}
> class MyRegistrator extends KryoRegistrator {
> override def registerClasses(kryo: Kryo) {
> kryo.register(classOf[Vector])
> kryo.register(classOf[String])
> kryo.register(classOf[Float])
> kryo.register(classOf[Tuple3[String,Float,Vector]])
> kryo.register(classOf[Seq[Tuple3[String,Float,Vector]]])
> kryo.register(classOf[Map[String,Vector]])
> }
> }
> {code}
> "Vector" in this case is an org.mlbase.Vector, which in this case is a
> slightly modified version of spark.util.Vector (uses floats instead of
> Doubles).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]