JC created BEAM-4597:
------------------------

             Summary: Serialization problem using SparkRunner and 
KryoSerializer from spark
                 Key: BEAM-4597
                 URL: https://issues.apache.org/jira/browse/BEAM-4597
             Project: Beam
          Issue Type: Bug
          Components: runner-spark
    Affects Versions: 2.4.0
            Reporter: JC
            Assignee: Amit Sela
         Attachments: BEAM-SPARK-KRYO-BUG.zip

When using the SparkRunner and specifying Spark to use the 'KryoSerializer' as:
{quote}spark-submit --class org.apache.beam.examples.BugWithKryoOnSpark 
--master yarn --deploy-mode client --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer 
/tmp/kafka-sdk-beam-example-bundled-0.1.jar --runner=SparkRunner
{quote}
We get an exception after 10 or 15 seconds:
{quote}Exception in thread "main" java.lang.RuntimeException: 
org.apache.spark.SparkException: Job aborted due to stage failure: Exception 
while getting task result: com.esotericsoftware.kryo.KryoException: Unable to 
find class: 
org.apache.beam.runners.core.metrics.MetricsContainerImpl$$Lambda$31/1875283985
Serialization trace:
factory (org.apache.beam.runners.core.metrics.MetricsMap)
counters (org.apache.beam.runners.core.metrics.MetricsContainerImpl)
metricsContainers (org.apache.beam.runners.core.metrics.MetricsContainerStepMap)
metricsContainers 
(org.apache.beam.runners.spark.io.SparkUnboundedSource$Metadata)
 at 
org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:55)
 at 
org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
 at 
org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:41)
 at 
org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:163)
 at 
org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:198)
 at 
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
 at 
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:87)
 at org.apache.beam.examples.BugWithKryoOnSpark.main(BugWithKryoOnSpark.java:75)
{quote}
But when using the SparkRunner and specifying Spark to use the 'JavaSerializer' 
as:
{quote}spark-submit --class org.apache.beam.examples.BugWithKryoOnSpark 
--master yarn --deploy-mode client --conf 
spark.serializer=org.apache.spark.serializer.JavaSerializer 
/tmp/kafka-sdk-beam-example-bundled-0.1.jar --runner=SparkRunner
{quote}
The pipeline works correctly.

Our deployment consist of (CDH 5.14.2, Parcels) and Spark2

spark-submit --version
Welcome to
 ____ __
 / __/__ ___ _____/ /__
 _\ \/ _ \/ _ `/ __/ '_/
 /___/ .__/\_,_/_/ /_/\_\ version 2.3.0.cloudera2
 /_/
 
Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_151
Branch HEAD
Compiled by user jenkins on 2018-04-10T23:08:17Z
Revision 9f5baab06f127486a030024877fc13a3992f100f
Url git://github.mtv.cloudera.com/CDH/spark.git
Type --help for more information.

I have attached a sample maven project which read data from kafka (localhost) 
and just produce an echo of the incoming data to reproduce this bug, please 
refer to the README for the full Stacktrace and information of how to build the 
sample

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to