[ 
https://issues.apache.org/jira/browse/SPARK-14540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16119524#comment-16119524
 ] 

Lukas Rytz edited comment on SPARK-14540 at 8/9/17 7:37 AM:
------------------------------------------------------------

[~joshrosen] the closure in your last example is serializable with 2.12.3. The 
anonymous class takes an outer parameter, but since 
https://github.com/scala/scala/pull/5099 the compiler implements an analysis to 
see that it's not used, and replaces the argument with {{null}}. The lambda 
body method then becomes static.

Example code
{code}
class C {
  def foo(f: String => Object) = 0
  def bar = {
    foo { x: Any => new Object{} }
  }
}
{code}

{noformat}
$> scalac -version
Scala compiler version 2.12.3 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, 
Inc.

$> scalac Test.scala -Xprint:cleanup,delambdafy
[[syntax trees at end of                   cleanup]] // Test.scala
package <empty> {
  class C extends Object {
    def foo(f: Function1): Int = 0;
    def bar(): Int = C.this.foo({
      ((x: Object) => C.this.$anonfun|$1(x))
    });
    final <artifact> private[this] def $anonfun|$1(x: Object): Object = new 
<$anon: Object>(C.this);
    def <init>(): C = {
      C.super.<init>();
      ()
    }
  };
  final class anon$1 extends Object {
    def <init>($outer: C): <$anon: Object> = {
      anon$1.super.<init>();
      ()
    }
  }
}

[[syntax trees at end of                delambdafy]] // Test.scala
package <empty> {
  class C extends Object {
    def foo(f: Function1): Int = 0;
    def bar(): Int = C.this.foo({
      $anonfun()
    });
    final <static> <artifact> def $anonfun|$1(x: Object): Object = new <$anon: 
Object>(null);
    def <init>(): C = {
      C.super.<init>();
      ()
    }
  };
  final class anon$1 extends Object {
    def <init>($outer: C): <$anon: Object> = {
      anon$1.super.<init>();
      ()
    }
  }
}

$> javap -v -cp . C
...
  public static final java.lang.Object $anonfun$bar$1(java.lang.Object);
    descriptor: (Ljava/lang/Object;)Ljava/lang/Object;
    flags: ACC_PUBLIC, ACC_STATIC, ACC_FINAL, ACC_SYNTHETIC
    Code:
      stack=3, locals=1, args_size=1
         0: new           #10                 // class C$$anon$1
         3: dup
         4: aconst_null
         5: invokespecial #51                 // Method 
C$$anon$1."<init>":(LC;)V
         8: areturn
...
{noformat}


was (Author: lrytz):
[~joshrosen] the closure in your last example is serializable with 2.12.3. The 
anonymous class takes an outer parameter, but since 
https://github.com/scala/scala/pull/5099 the compiler implements an analysis to 
see that it's not used, and replaces the argument with {{null}}.

Example code
{code}
class C {
  def foo(f: String => Object) = 0
  def bar = {
    foo { x: Any => new Object{} }
  }
}
{code}

{noformat}
$> scalac -version
Scala compiler version 2.12.3 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, 
Inc.

$> scalac Test.scala -Xprint:cleanup,delambdafy
[[syntax trees at end of                   cleanup]] // Test.scala
package <empty> {
  class C extends Object {
    def foo(f: Function1): Int = 0;
    def bar(): Int = C.this.foo({
      ((x: Object) => C.this.$anonfun|$1(x))
    });
    final <artifact> private[this] def $anonfun|$1(x: Object): Object = new 
<$anon: Object>(C.this);
    def <init>(): C = {
      C.super.<init>();
      ()
    }
  };
  final class anon$1 extends Object {
    def <init>($outer: C): <$anon: Object> = {
      anon$1.super.<init>();
      ()
    }
  }
}

[[syntax trees at end of                delambdafy]] // Test.scala
package <empty> {
  class C extends Object {
    def foo(f: Function1): Int = 0;
    def bar(): Int = C.this.foo({
      $anonfun()
    });
    final <static> <artifact> def $anonfun|$1(x: Object): Object = new <$anon: 
Object>(null);
    def <init>(): C = {
      C.super.<init>();
      ()
    }
  };
  final class anon$1 extends Object {
    def <init>($outer: C): <$anon: Object> = {
      anon$1.super.<init>();
      ()
    }
  }
}

$> javap -v -cp . C
...
  public static final java.lang.Object $anonfun$bar$1(java.lang.Object);
    descriptor: (Ljava/lang/Object;)Ljava/lang/Object;
    flags: ACC_PUBLIC, ACC_STATIC, ACC_FINAL, ACC_SYNTHETIC
    Code:
      stack=3, locals=1, args_size=1
         0: new           #10                 // class C$$anon$1
         3: dup
         4: aconst_null
         5: invokespecial #51                 // Method 
C$$anon$1."<init>":(LC;)V
         8: areturn
...
{noformat}

> Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner
> ----------------------------------------------------------------
>
>                 Key: SPARK-14540
>                 URL: https://issues.apache.org/jira/browse/SPARK-14540
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Spark Core
>            Reporter: Josh Rosen
>
> Using https://github.com/JoshRosen/spark/tree/build-for-2.12, I tried running 
> ClosureCleanerSuite with Scala 2.12 and ran into two bad test failures:
> {code}
> [info] - toplevel return statements in closures are identified at cleaning 
> time *** FAILED *** (32 milliseconds)
> [info]   Expected exception 
> org.apache.spark.util.ReturnStatementInClosureException to be thrown, but no 
> exception was thrown. (ClosureCleanerSuite.scala:57)
> {code}
> and
> {code}
> [info] - user provided closures are actually cleaned *** FAILED *** (56 
> milliseconds)
> [info]   Expected ReturnStatementInClosureException, but got 
> org.apache.spark.SparkException: Job aborted due to stage failure: Task not 
> serializable: java.io.NotSerializableException: java.lang.Object
> [info]        - element of array (index: 0)
> [info]        - array (class "[Ljava.lang.Object;", size: 1)
> [info]        - field (class "java.lang.invoke.SerializedLambda", name: 
> "capturedArgs", type: "class [Ljava.lang.Object;")
> [info]        - object (class "java.lang.invoke.SerializedLambda", 
> SerializedLambda[capturingClass=class 
> org.apache.spark.util.TestUserClosuresActuallyCleaned$, 
> functionalInterfaceMethod=scala/runtime/java8/JFunction1$mcII$sp.apply$mcII$sp:(I)I,
>  implementation=invokeStatic 
> org/apache/spark/util/TestUserClosuresActuallyCleaned$.org$apache$spark$util$TestUserClosuresActuallyCleaned$$$anonfun$69:(Ljava/lang/Object;I)I,
>  instantiatedMethodType=(I)I, numCaptured=1])
> [info]        - element of array (index: 0)
> [info]        - array (class "[Ljava.lang.Object;", size: 1)
> [info]        - field (class "java.lang.invoke.SerializedLambda", name: 
> "capturedArgs", type: "class [Ljava.lang.Object;")
> [info]        - object (class "java.lang.invoke.SerializedLambda", 
> SerializedLambda[capturingClass=class org.apache.spark.rdd.RDD, 
> functionalInterfaceMethod=scala/Function3.apply:(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;,
>  implementation=invokeStatic 
> org/apache/spark/rdd/RDD.org$apache$spark$rdd$RDD$$$anonfun$20$adapted:(Lscala/Function1;Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
>  
> instantiatedMethodType=(Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
>  numCaptured=1])
> [info]        - field (class "org.apache.spark.rdd.MapPartitionsRDD", name: 
> "f", type: "interface scala.Function3")
> [info]        - object (class "org.apache.spark.rdd.MapPartitionsRDD", 
> MapPartitionsRDD[2] at apply at Transformer.scala:22)
> [info]        - field (class "scala.Tuple2", name: "_1", type: "class 
> java.lang.Object")
> [info]        - root object (class "scala.Tuple2", (MapPartitionsRDD[2] at 
> apply at 
> Transformer.scala:22,org.apache.spark.SparkContext$$Lambda$957/431842435@6e803685)).
> [info]   This means the closure provided by user is not actually cleaned. 
> (ClosureCleanerSuite.scala:78)
> {code}
> We'll need to figure out a closure cleaning strategy which works for 2.12 
> lambdas.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to