[ 
https://issues.apache.org/jira/browse/BEAM-6771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Winkelman updated BEAM-6771:
---------------------------------
    Priority: Blocker  (was: Critical)

> Spark Runner Fails on Certain Versions of Spark 2.X
> ---------------------------------------------------
>
>                 Key: BEAM-6771
>                 URL: https://issues.apache.org/jira/browse/BEAM-6771
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 2.11.0
>            Reporter: Kyle Winkelman
>            Priority: Blocker
>
> When updating to Beam 2.11.0, I ran into the exception at the bottom of this 
> issue while running a pipeline on the Spark Runner (which worked in 2.9.0). 
> My cluster uses Spark 2.2.1.
> Related Issues:
> SPARK-23697 (Proof that equals must be implemented for items being 
> accumulated.)
> BEAM-1920 (In PR#3808, equals was implemented on MetricsContainerStepMap to 
> get Spark to run on 2.X.)
> My analysis has lead me to believe that BEAM-6138 is the reason for this 
> issue.
> Before this change, versions of Spark that are affected by SPARK-23697 would 
> create a new MetricsContainerStepMap and make sure that the copied and reset 
> instance (the one serialized for distribution) is equal to the initial empty 
> MetricsContainerStepMap that is passed in. This would effectively check if 
> two empty ConcurrentHashMaps were equal. This results in true.
> After this change, versions of Spark that are affected by SPARK-23697 would 
> effectively be checking if two empty ConcurrentHashMaps were equal _*AND*_ if 
> two different instances of the MetricsContainerImpl are equal. Because 
> MetricsContainerImpl doesn't implement equals, this results in false.
> I believe BEAM-6546 will fix this issue, but I wanted to raise a red flag. I 
> am also hoping someone can verify my analysis.
> {noformat}
> ERROR ApplicationMaster: User class threw exception: 
> java.lang.RuntimeException: java.lang.AssertionError: assertion failed: 
> copyAndReset must return a zero value copy
> java.lang.RuntimeException: java.lang.AssertionError: assertion failed: 
> copyAndReset must return a zero value copy
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:54)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:98)
>       at com.optum.analyticstore.execution.Exec.run(Exec.java:276)
>       at com.optum.analyticstore.execution.Exec.main(Exec.java:364)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
> Caused by: java.lang.AssertionError: assertion failed: copyAndReset must 
> return a zero value copy
>       at scala.Predef$.assert(Predef.scala:170)
>       at 
> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:163)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1218)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>       at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
>       at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
>       at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>       at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>       at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:794)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>       at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:793)
>       at 
> org.apache.spark.api.java.JavaRDDLike$class.mapPartitionsToPair(JavaRDDLike.scala:188)
>       at 
> org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsToPair(JavaRDDLike.scala:45)
>       at 
> org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:388)
>       at 
> org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:339)
>       at 
> org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:440)
>       at 
> org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:428)
>       at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
>       at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
>       at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
>       at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
>       at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
>       at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
>       at 
> org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:224)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to