[ 
https://issues.apache.org/jira/browse/BEAM-4597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JC updated BEAM-4597:
---------------------
    Attachment: BEAM-SPARK-KRYO-BUG.zip

> Serialization problem using SparkRunner and KryoSerializer from spark
> ---------------------------------------------------------------------
>
>                 Key: BEAM-4597
>                 URL: https://issues.apache.org/jira/browse/BEAM-4597
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 2.4.0
>            Reporter: JC
>            Assignee: Amit Sela
>            Priority: Major
>         Attachments: BEAM-SPARK-KRYO-BUG.zip
>
>
> When using the SparkRunner and specifying Spark to use the 'KryoSerializer' 
> as:
> {quote}spark-submit --class org.apache.beam.examples.BugWithKryoOnSpark 
> --master yarn --deploy-mode client --conf 
> spark.serializer=org.apache.spark.serializer.KryoSerializer 
> /tmp/kafka-sdk-beam-example-bundled-0.1.jar --runner=SparkRunner
> {quote}
> We get an exception after 10 or 15 seconds:
> {quote}Exception in thread "main" java.lang.RuntimeException: 
> org.apache.spark.SparkException: Job aborted due to stage failure: Exception 
> while getting task result: com.esotericsoftware.kryo.KryoException: Unable to 
> find class: 
> org.apache.beam.runners.core.metrics.MetricsContainerImpl$$Lambda$31/1875283985
> Serialization trace:
> factory (org.apache.beam.runners.core.metrics.MetricsMap)
> counters (org.apache.beam.runners.core.metrics.MetricsContainerImpl)
> metricsContainers 
> (org.apache.beam.runners.core.metrics.MetricsContainerStepMap)
> metricsContainers 
> (org.apache.beam.runners.spark.io.SparkUnboundedSource$Metadata)
>  at 
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:55)
>  at 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>  at 
> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:41)
>  at 
> org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:163)
>  at 
> org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:198)
>  at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>  at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:87)
>  at 
> org.apache.beam.examples.BugWithKryoOnSpark.main(BugWithKryoOnSpark.java:75)
> {quote}
> But when using the SparkRunner and specifying Spark to use the 
> 'JavaSerializer' as:
> {quote}spark-submit --class org.apache.beam.examples.BugWithKryoOnSpark 
> --master yarn --deploy-mode client --conf 
> spark.serializer=org.apache.spark.serializer.JavaSerializer 
> /tmp/kafka-sdk-beam-example-bundled-0.1.jar --runner=SparkRunner
> {quote}
> The pipeline works correctly.
> Our deployment consist of (CDH 5.14.2, Parcels) and Spark2
> spark-submit --version
> Welcome to
>  ____ __
>  / __/__ ___ _____/ /__
>  _\ \/ _ \/ _ `/ __/ '_/
>  /___/ .__/\_,_/_/ /_/\_\ version 2.3.0.cloudera2
>  /_/
>  
> Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_151
> Branch HEAD
> Compiled by user jenkins on 2018-04-10T23:08:17Z
> Revision 9f5baab06f127486a030024877fc13a3992f100f
> Url git://github.mtv.cloudera.com/CDH/spark.git
> Type --help for more information.
> I have attached a sample maven project which read data from kafka (localhost) 
> and just produce an echo of the incoming data to reproduce this bug, please 
> refer to the README for the full Stacktrace and information of how to build 
> the sample
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to