[
https://issues.apache.org/jira/browse/BEAM-6771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kyle Winkelman updated BEAM-6771:
---------------------------------
Priority: Blocker (was: Critical)
> Spark Runner Fails on Certain Versions of Spark 2.X
> ---------------------------------------------------
>
> Key: BEAM-6771
> URL: https://issues.apache.org/jira/browse/BEAM-6771
> Project: Beam
> Issue Type: Bug
> Components: runner-spark
> Affects Versions: 2.11.0
> Reporter: Kyle Winkelman
> Priority: Blocker
>
> When updating to Beam 2.11.0, I ran into the exception at the bottom of this
> issue while running a pipeline on the Spark Runner (which worked in 2.9.0).
> My cluster uses Spark 2.2.1.
> Related Issues:
> SPARK-23697 (Proof that equals must be implemented for items being
> accumulated.)
> BEAM-1920 (In PR#3808, equals was implemented on MetricsContainerStepMap to
> get Spark to run on 2.X.)
> My analysis has lead me to believe that BEAM-6138 is the reason for this
> issue.
> Before this change, versions of Spark that are affected by SPARK-23697 would
> create a new MetricsContainerStepMap and make sure that the copied and reset
> instance (the one serialized for distribution) is equal to the initial empty
> MetricsContainerStepMap that is passed in. This would effectively check if
> two empty ConcurrentHashMaps were equal. This results in true.
> After this change, versions of Spark that are affected by SPARK-23697 would
> effectively be checking if two empty ConcurrentHashMaps were equal _*AND*_ if
> two different instances of the MetricsContainerImpl are equal. Because
> MetricsContainerImpl doesn't implement equals, this results in false.
> I believe BEAM-6546 will fix this issue, but I wanted to raise a red flag. I
> am also hoping someone can verify my analysis.
> {noformat}
> ERROR ApplicationMaster: User class threw exception:
> java.lang.RuntimeException: java.lang.AssertionError: assertion failed:
> copyAndReset must return a zero value copy
> java.lang.RuntimeException: java.lang.AssertionError: assertion failed:
> copyAndReset must return a zero value copy
> at
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:54)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:98)
> at com.optum.analyticstore.execution.Exec.run(Exec.java:276)
> at com.optum.analyticstore.execution.Exec.main(Exec.java:364)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
> Caused by: java.lang.AssertionError: assertion failed: copyAndReset must
> return a zero value copy
> at scala.Predef$.assert(Predef.scala:170)
> at
> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:163)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1218)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:794)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:793)
> at
> org.apache.spark.api.java.JavaRDDLike$class.mapPartitionsToPair(JavaRDDLike.scala:188)
> at
> org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsToPair(JavaRDDLike.scala:45)
> at
> org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:388)
> at
> org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:339)
> at
> org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:440)
> at
> org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:428)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
> at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
> at
> org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:224)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)