[ 
https://issues.apache.org/jira/browse/SPARK-7043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14527575#comment-14527575
 ] 

Frederick Reiss commented on SPARK-7043:
----------------------------------------

I did some looking into this issue. Here's a summary of what I've found:

The interpreter rewrites each command into a pair of generated classes, one for 
the "read" phase and one for the "eval" phase of executing the statement.

For the command:
{{case class Foo(i: Int);val ret = sc.parallelize((1 to 100).map(Foo), 
10).collect()}}
this rewrite process produces the following class (simplified here by removing 
empty constructors) to cover the "read" phase:

{noformat}
class $read extends Serializable {
  class $iwC extends Serializable {
    val $VAL10 = $line3.$read.INSTANCE;
    import $VAL10.$iw.$iw.sc;
    class $iwC extends Serializable {
      case class Foo extends scala.Product with scala.Serializable {
        <caseaccessor> <paramaccessor> val i: Int = _;
      };
      val ret = sc.parallelize(1.to(100).map(Foo), 10).collect()
    };
    val $iw = new $iwC()
  };
  val $iw = new $iwC()
}
{noformat}

Note the lines
{{val $VAL10 = $line3.$read.INSTANCE;}}
{{import $VAL10.$iw.$iw.sc;}}
These lines pull the symbol {{sc}} into the scope of the class {{$read.$iwC}}. 
They also happen to give the class {{$read.$iwC}} a field of type 
{{$line3.$read.$iwC}}. I haven't been able to get at the definition of the 
generated class or object {{$line3}}, but it appears that {{$line3}} is a 
rewritten version of the Scala code
{noformat}
                @transient val sc = {
                  val _sc = 
org.apache.spark.repl.Main.interp.createSparkContext()
                  println("Spark context available as sc.")
                  _sc
                }
{noformat}
which runs whenever the interpreter starts up.

The generated code for the current line gets further bundled inside a generated 
object or class (I'm not sure which) that represents the current line's state 
transitions. In the trace I'm looking at, this object/class is called 
{{$line19}}.

As a result, the case class {{Foo}} from the command line turns into the case 
class {{$line19.$read.$iwC.$iwC.Foo}}, which the Scala compiler turns into a 
Java inner class by the same name.

The call to {{sc.parallelize(1.to(100).map(Foo), 10)}} causes Spark to 
serialize an array of instances of {{$line19.$read.$iwC.$iwC.Foo}}. The 
serializer serializes each of these inner class objects, along with their 
parent class objects. During this serialization process, the serializer skips 
the pointer to the SparkContext, since that pointer is marked as transient.

Inside of the call to {{collect()}}, the deserializer reconstructs instances of 
{{$line19.$read.$iwC.$iwC.Foo}}. Before creating such an instance, the 
deserializer needs to create an enclosing instance of 
{{$line19.$read.$iwC.$iwC}}, which needs an enclosing instance of 
{{$line19.$read.$iwC}}. The class {{$line19.$read.$iwC}} contains a field 
({{val $VAL10 = $line3.$read.INSTANCE}}) that has a pointer to a field of an 
object of type {{$line3.$read.$iwC}}. The type {{$line3.$read.$iwC}} in turn 
has a transient field called "sc" somewhere underneath it.

At some point during initializing the object of type {{$line3.$read.$iwC}}, the 
Kryo deserializer makes a call to the static initializer for the transient 
field "sc". This static initializer calls 
{{org.apache.spark.repl.Main.interp.createSparkContext()}}, which attempts and 
fails to create a second SparkContext. The deserialization process fails with 
an exception.

I'm not sure exactly why the Java deserializer doesn't crash in the same way. I 
suspect that, when deserializing the transient field "sc", Java's deserializer 
sets that field to null and moves on.

I'm not sure which way of initializing the transient field is correct. The 
serialization spec 
([http://docs.oracle.com/javase/7/docs/platform/serialization/spec/serialTOC.html])
 is vague about default values for transient fields.


> KryoSerializer cannot be used with REPL to interpret code in which case class 
> definition and its shipping are in the same line
> ------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-7043
>                 URL: https://issues.apache.org/jira/browse/SPARK-7043
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Shell
>    Affects Versions: 1.3.1
>         Environment: Ubuntu 14.04, no hadoop
>            Reporter: Peng Cheng
>            Priority: Minor
>              Labels: classloader, kryo
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> When deploying Spark-shell with
> "spark.serializer=org.apache.spark.serializer.KryoSerializer" option. 
> Spark-shell cannot execute the following code (in 1 line):
>     case class Foo(i: Int);val ret = sc.parallelize((1 to 100).map(Foo), 
> 10).collect()
> This problem won't exist for either JavaSerializer or code splitted into 2 
> lines. The only possible explanation is that KryoSerializer is using a 
> ClassLoader that is not registered as an subsidiary ClassLoader of the one in 
> REPL.
> A "dirty" fix would be just breaking input by semicolon, but its better to 
> fix the ClassLoader to avoid other liabilities.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to