[ 
https://issues.apache.org/jira/browse/SPARK-7708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14564038#comment-14564038
 ] 

Josh Rosen commented on SPARK-7708:
-----------------------------------

Two hours later and I've now found where the state was hiding.  I discovered 
this using the following isolated test project, which explains the 3-byte size 
difference: https://gist.github.com/JoshRosen/14ba69ef53af53ef2839

Intuitively, you might think that it's in {{Output}} because using a new 
{{Output}} solves the issue.  However, it turns out that the state was hiding 
inside Kryo's {{JavaSerializer}} class:

{code}
public class JavaSerializer extends Serializer {
  private ObjectOutputStream objectStream;
  private Output lastOutput;

  public JavaSerializer() {
  }

  public void write(Kryo kryo, Output output, Object object) {
    try {
      if(output != this.lastOutput) {
        this.objectStream = new ObjectOutputStream(output);
        this.lastOutput = output;
      } else {
        this.objectStream.reset();
      }

      this.objectStream.writeObject(object);
      this.objectStream.flush();
    } catch (Exception var5) {
      throw new KryoException("Error during Java serialization.", var5);
    }
  }

[...]
{code}

When you pass a new output, it opens a new ObjectOutputStream and write a new 
stream header, but when you reuse the output it only writes a reset flag.  The 
header is two shorts, which is four bytes, whereas the reset is only one byte, 
leading to the 3-byte difference.

I'm not sure whether this is a bug in Kryo or whether our re-use of Output is 
unsafe.  I'll email the developers to ask.

> Incorrect task serialization with Kryo closure serializer
> ---------------------------------------------------------
>
>                 Key: SPARK-7708
>                 URL: https://issues.apache.org/jira/browse/SPARK-7708
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.2.2
>            Reporter: Akshat Aranya
>
> I've been investigating the use of Kryo for closure serialization with Spark 
> 1.2, and it seems like I've hit upon a bug:
> When a task is serialized before scheduling, the following log message is 
> generated:
> [info] o.a.s.s.TaskSetManager - Starting task 124.1 in stage 0.0 (TID 342, 
> <host>, PROCESS_LOCAL, 302 bytes)
> This message comes from TaskSetManager which serializes the task using the 
> closure serializer.  Before the message is sent out, the TaskDescription 
> (which included the original task as a byte array), is serialized again into 
> a byte array with the closure serializer.  I added a log message for this in 
> CoarseGrainedSchedulerBackend, which produces the following output:
> [info] o.a.s.s.c.CoarseGrainedSchedulerBackend - 124.1 size=132
> The serialized size of TaskDescription (132 bytes) turns out to be _smaller_ 
> than serialized task that it contains (302 bytes). This implies that 
> TaskDescription.buffer is not getting serialized correctly.
> On the executor side, the deserialization produces a null value for 
> TaskDescription.buffer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to