[ 
https://issues.apache.org/jira/browse/SPARK-7708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14559549#comment-14559549
 ] 

Akshat Aranya commented on SPARK-7708:
--------------------------------------

[~joshrosen] I am tracking down a problem with Kryo serialization of closures, 
and I have distilled it to a unit test that has been failing for me:

https://github.com/coolfrood/spark/blob/topic/kryo-closure-test/core/src/test/scala/org/apache/spark/serializer/KryoClosureSerializerSuite.scala

In this test, I serialize and deserialize a class containing a 
{{SerializableWritable}} twice with the same serializer instance.  The first 
time around, the ser/deser works fine.  The second time around, it fails due to 
some previous state contained in the serializer:

{noformat}
00:00 TRACE: [kryo] Object graph complete.
00:00 TRACE: [kryo] Register class name: 
org.apache.spark.serializer.TestPartition 
(com.esotericsoftware.kryo.serializers.FieldSerializer)
00:00 TRACE: [kryo] Write class name: org.apache.spark.serializer.TestPartition
00:00 TRACE: [kryo] Write initial object reference 0: 
org.apache.spark.serializer.TestPartition
00:00 DEBUG: [kryo] Write: org.apache.spark.serializer.TestPartition
00:00 TRACE: [kryo] Write field: foobar 
(org.apache.spark.serializer.TestPartition)
00:00 TRACE: [kryo] Write class 31: org.apache.spark.SerializableWritable
00:00 TRACE: [kryo] Write initial object reference 1: /foo:0+100
00:00 DEBUG: [kryo] Write: /foo:0+100
00:00 TRACE: [kryo] Object graph complete.
serialized.limit=369
00:00 TRACE: [kryo] Read class name: org.apache.spark.serializer.TestPartition
00:00 TRACE: [kryo] Read initial object reference 0: 
org.apache.spark.serializer.TestPartition
00:00 TRACE: [kryo] Read field: foobar 
(org.apache.spark.serializer.TestPartition)
00:00 TRACE: [kryo] Read class 31: org.apache.spark.SerializableWritable
00:00 TRACE: [kryo] Read initial object reference 1: 
org.apache.spark.SerializableWritable
00:00 DEBUG: [kryo] Read: /foo:0+100
00:00 DEBUG: [kryo] Read: org.apache.spark.serializer.TestPartition
00:00 TRACE: [kryo] Object graph complete.
is=/foo:0+100
00:00 TRACE: [kryo] Object graph complete.
00:00 TRACE: [kryo] Write class name: org.apache.spark.serializer.TestPartition
00:00 TRACE: [kryo] Write initial object reference 0: 
org.apache.spark.serializer.TestPartition
00:00 DEBUG: [kryo] Write: org.apache.spark.serializer.TestPartition
00:00 TRACE: [kryo] Write field: foobar 
(org.apache.spark.serializer.TestPartition)
00:00 TRACE: [kryo] Write class 31: org.apache.spark.SerializableWritable
00:00 TRACE: [kryo] Write initial object reference 1: /foo:0+100
00:00 DEBUG: [kryo] Write: /foo:0+100
00:00 TRACE: [kryo] Object graph complete.
serialized.limit=366
00:00 TRACE: [kryo] Read class name: org.apache.spark.serializer.TestPartition
00:00 TRACE: [kryo] Read initial object reference 0: 
org.apache.spark.serializer.TestPartition
00:00 TRACE: [kryo] Read field: foobar 
(org.apache.spark.serializer.TestPartition)
00:00 TRACE: [kryo] Read initial object reference 1: 
org.apache.spark.SerializableWritable
00:00 TRACE: [kryo] Object graph complete.
[info] - serialize various things using kryo closure serializer *** FAILED *** 
(175 milliseconds)
[info]   com.esotericsoftware.kryo.KryoException: Error during Java 
deserialization.
[info] Serialization trace:
[info] foobar (org.apache.spark.serializer.TestPartition)
[info]   at 
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
[info]   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
[info]   at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
[info]   at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
[info]   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
[info]   at 
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:193)
[info]   at 
org.apache.spark.serializer.KryoClosureSerializerSuite$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(KryoClosureSerializerSuite.scala:62)
[info]   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
[info]   at 
org.apache.spark.serializer.KryoClosureSerializerSuite$$anonfun$1.apply$mcV$sp(KryoClosureSerializerSuite.scala:59)
[info]   at 
org.apache.spark.serializer.KryoClosureSerializerSuite$$anonfun$1.apply(KryoClosureSerializerSuite.scala:51)
[info]   at 
org.apache.spark.serializer.KryoClosureSerializerSuite$$anonfun$1.apply(KryoClosureSerializerSuite.scala:51)
[info]   at 
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
[info]   at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
[info]   at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
[info]   at 
org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
[info]   at 
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
[info]   at 
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
[info]   at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
[info]   at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
[info]   at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
[info]   at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
[info]   at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
[info]   at scala.collection.immutable.List.foreach(List.scala:318)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at 
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
[info]   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
[info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
[info]   at org.scalatest.Suite$class.run(Suite.scala:1424)
[info]   at 
org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
[info]   at 
org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
[info]   at 
org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
[info]   at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
[info]   at 
org.apache.spark.serializer.KryoClosureSerializerSuite.org$scalatest$BeforeAndAfterAll$$super$run(KryoClosureSerializerSuite.scala:46)
[info]   at 
org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
[info]   at 
org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
[info]   at 
org.apache.spark.serializer.KryoClosureSerializerSuite.run(KryoClosureSerializerSuite.scala:46)
[info]   at 
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462)
[info]   at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:294)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:284)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
[info]   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[info]   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[info]   at java.lang.Thread.run(Thread.java:745)
[info]   Cause: java.io.StreamCorruptedException: invalid stream header: 
79737200
[info]   at 
java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804)
[info]   at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
[info]   at 
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:40)
[info]   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
[info]   at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
[info]   at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
[info]   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
[info]   at 
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:193)
{noformat}

Perhaps you could help me with this?

> Incorrect task serialization with Kryo closure serializer
> ---------------------------------------------------------
>
>                 Key: SPARK-7708
>                 URL: https://issues.apache.org/jira/browse/SPARK-7708
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.2.2
>            Reporter: Akshat Aranya
>
> I've been investigating the use of Kryo for closure serialization with Spark 
> 1.2, and it seems like I've hit upon a bug:
> When a task is serialized before scheduling, the following log message is 
> generated:
> [info] o.a.s.s.TaskSetManager - Starting task 124.1 in stage 0.0 (TID 342, 
> <host>, PROCESS_LOCAL, 302 bytes)
> This message comes from TaskSetManager which serializes the task using the 
> closure serializer.  Before the message is sent out, the TaskDescription 
> (which included the original task as a byte array), is serialized again into 
> a byte array with the closure serializer.  I added a log message for this in 
> CoarseGrainedSchedulerBackend, which produces the following output:
> [info] o.a.s.s.c.CoarseGrainedSchedulerBackend - 124.1 size=132
> The serialized size of TaskDescription (132 bytes) turns out to be _smaller_ 
> than serialized task that it contains (302 bytes). This implies that 
> TaskDescription.buffer is not getting serialized correctly.
> On the executor side, the deserialization produces a null value for 
> TaskDescription.buffer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to