[
https://issues.apache.org/jira/browse/BEAM-4597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
JC updated BEAM-4597:
---------------------
Attachment: BEAM-SPARK-KRYO-BUG.zip
> Serialization problem using SparkRunner and KryoSerializer from spark
> ---------------------------------------------------------------------
>
> Key: BEAM-4597
> URL: https://issues.apache.org/jira/browse/BEAM-4597
> Project: Beam
> Issue Type: Bug
> Components: runner-spark
> Affects Versions: 2.4.0
> Reporter: JC
> Assignee: Amit Sela
> Priority: Major
> Attachments: BEAM-SPARK-KRYO-BUG.zip
>
>
> When using the SparkRunner and specifying Spark to use the 'KryoSerializer'
> as:
> {quote}spark-submit --class org.apache.beam.examples.BugWithKryoOnSpark
> --master yarn --deploy-mode client --conf
> spark.serializer=org.apache.spark.serializer.KryoSerializer
> /tmp/kafka-sdk-beam-example-bundled-0.1.jar --runner=SparkRunner
> {quote}
> We get an exception after 10 or 15 seconds:
> {quote}Exception in thread "main" java.lang.RuntimeException:
> org.apache.spark.SparkException: Job aborted due to stage failure: Exception
> while getting task result: com.esotericsoftware.kryo.KryoException: Unable to
> find class:
> org.apache.beam.runners.core.metrics.MetricsContainerImpl$$Lambda$31/1875283985
> Serialization trace:
> factory (org.apache.beam.runners.core.metrics.MetricsMap)
> counters (org.apache.beam.runners.core.metrics.MetricsContainerImpl)
> metricsContainers
> (org.apache.beam.runners.core.metrics.MetricsContainerStepMap)
> metricsContainers
> (org.apache.beam.runners.spark.io.SparkUnboundedSource$Metadata)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:55)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:41)
> at
> org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:163)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:198)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:87)
> at
> org.apache.beam.examples.BugWithKryoOnSpark.main(BugWithKryoOnSpark.java:75)
> {quote}
> But when using the SparkRunner and specifying Spark to use the
> 'JavaSerializer' as:
> {quote}spark-submit --class org.apache.beam.examples.BugWithKryoOnSpark
> --master yarn --deploy-mode client --conf
> spark.serializer=org.apache.spark.serializer.JavaSerializer
> /tmp/kafka-sdk-beam-example-bundled-0.1.jar --runner=SparkRunner
> {quote}
> The pipeline works correctly.
> Our deployment consist of (CDH 5.14.2, Parcels) and Spark2
> spark-submit --version
> Welcome to
> ____ __
> / __/__ ___ _____/ /__
> _\ \/ _ \/ _ `/ __/ '_/
> /___/ .__/\_,_/_/ /_/\_\ version 2.3.0.cloudera2
> /_/
>
> Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_151
> Branch HEAD
> Compiled by user jenkins on 2018-04-10T23:08:17Z
> Revision 9f5baab06f127486a030024877fc13a3992f100f
> Url git://github.mtv.cloudera.com/CDH/spark.git
> Type --help for more information.
> I have attached a sample maven project which read data from kafka (localhost)
> and just produce an echo of the incoming data to reproduce this bug, please
> refer to the README for the full Stacktrace and information of how to build
> the sample
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)