[ 
https://issues.apache.org/jira/browse/SPARK-20080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nick Hryhoriev updated SPARK-20080:
-----------------------------------
    Description: 
When i try use or init org.slf4j.Logger inside foreachPartition, that extracted 
to trait method. 
What was called in foreachRDD. 
I have found that foreachPartition method do not execute and no exception 
appeared.
Tested on local and yarn mode spark.

code can be found on 
[github|https://github.com/GrigorievNick/Spark2_1TraitLoggerSerialisationBug/tree/9da55393850df9fe19f5ff3e63b47ec2d1f67e17].
 There are two main class that explain problem.

if i will run same code with batch job. I will get right exception.
{code:java}
Exception in thread "main" org.apache.spark.SparkException: Task not 
serializable
    at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923)
    at TraitWithMethod$class.executeForEachpartitoin(TraitWithMethod.scala:12)
    at ReproduceBugMain$.executeForEachpartitoin(ReproduceBugMain.scala:7)
    at ReproduceBugMain$.main(ReproduceBugMain.scala:14)
    at ReproduceBugMain.main(ReproduceBugMain.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: ReproduceBugMain$
Serialization stack:
    - object not serializable (class: ReproduceBugMain$, value: 
ReproduceBugMain$@3935e9a8)
    - field (class: TraitWithMethod$$anonfun$executeForEachpartitoin$1, name: 
$outer, type: interface TraitWithMethod)
    - object (class TraitWithMethod$$anonfun$executeForEachpartitoin$1, 
<function1>)
    at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 18 more
{code}

On Github can be found 2 commit. 1 initial i add link on it(this one contain 
sptreaming example). and Last one with batch job example

  was:
up vote
0
down vote
favorite
When i try use or init org.slf4j.Logger inside foreachPartition. I have found 
that foreachPartition method do not execute and no exception appeared. Tested 
on local and yarn mode spark.

code can be found on github. There are two main class that explain problem.

if i will run same code with batch job. I will get right exception.
{code:java}
Exception in thread "main" org.apache.spark.SparkException: Task not 
serializable
    at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923)
    at TraitWithMethod$class.executeForEachpartitoin(TraitWithMethod.scala:12)
    at ReproduceBugMain$.executeForEachpartitoin(ReproduceBugMain.scala:7)
    at ReproduceBugMain$.main(ReproduceBugMain.scala:14)
    at ReproduceBugMain.main(ReproduceBugMain.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: ReproduceBugMain$
Serialization stack:
    - object not serializable (class: ReproduceBugMain$, value: 
ReproduceBugMain$@3935e9a8)
    - field (class: TraitWithMethod$$anonfun$executeForEachpartitoin$1, name: 
$outer, type: interface TraitWithMethod)
    - object (class TraitWithMethod$$anonfun$executeForEachpartitoin$1, 
<function1>)
    at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 18 more
{code}


> Spak streaming up do not throw serialisation exception in foreachRDD
> --------------------------------------------------------------------
>
>                 Key: SPARK-20080
>                 URL: https://issues.apache.org/jira/browse/SPARK-20080
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.1.0
>         Environment: local spark and yarn from big top 1.1.0 version
>            Reporter: Nick Hryhoriev
>            Priority: Minor
>
> When i try use or init org.slf4j.Logger inside foreachPartition, that 
> extracted to trait method. 
> What was called in foreachRDD. 
> I have found that foreachPartition method do not execute and no exception 
> appeared.
> Tested on local and yarn mode spark.
> code can be found on 
> [github|https://github.com/GrigorievNick/Spark2_1TraitLoggerSerialisationBug/tree/9da55393850df9fe19f5ff3e63b47ec2d1f67e17].
>  There are two main class that explain problem.
> if i will run same code with batch job. I will get right exception.
> {code:java}
> Exception in thread "main" org.apache.spark.SparkException: Task not 
> serializable
>     at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>     at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>     at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>     at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
>     at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
>     at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>     at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923)
>     at TraitWithMethod$class.executeForEachpartitoin(TraitWithMethod.scala:12)
>     at ReproduceBugMain$.executeForEachpartitoin(ReproduceBugMain.scala:7)
>     at ReproduceBugMain$.main(ReproduceBugMain.scala:14)
>     at ReproduceBugMain.main(ReproduceBugMain.scala)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
> Caused by: java.io.NotSerializableException: ReproduceBugMain$
> Serialization stack:
>     - object not serializable (class: ReproduceBugMain$, value: 
> ReproduceBugMain$@3935e9a8)
>     - field (class: TraitWithMethod$$anonfun$executeForEachpartitoin$1, name: 
> $outer, type: interface TraitWithMethod)
>     - object (class TraitWithMethod$$anonfun$executeForEachpartitoin$1, 
> <function1>)
>     at 
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>     at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
>     at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>     at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
>     ... 18 more
> {code}
> On Github can be found 2 commit. 1 initial i add link on it(this one contain 
> sptreaming example). and Last one with batch job example



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to