[ 
https://issues.apache.org/jira/browse/SPARK-5569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15088794#comment-15088794
 ] 

David Winters commented on SPARK-5569:
--------------------------------------

I have encountered this issue also.  I have a Spark streaming application that 
ingests messages from Kafka using the Direct API and checkpoints the DStream to 
ensure exactly once delivery semantics according to the Spark Streaming 
programming guide.  When I gracefully stop and then restart the streaming app, 
I get the following exception:

{noformat}
16/01/07 17:30:49 WARN CheckpointReader: Error reading checkpoint from file 
file:/Users/dwinters/Documents/workspace/DSE/INFRASTRUCTURE/dse-bitbucket-stream/hdfs/checkpoint/meta-processor-job/checkpoint-1452216644000
 
java.io.IOException: java.lang.ClassNotFoundException: 
org.apache.spark.streaming.kafka.OffsetRange 
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1140) 
at org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:184) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
at java.lang.reflect.Method.invoke(Method.java:606) 
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) 
at 
org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:251)
 
at 
org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:239)
 
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
at org.apache.spark.streaming.CheckpointReader$.read(Checkpoint.scala:239) 
at 
org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:623)
 
at 
org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:662)
 
at 
org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
 
at 
com.gopro.dse.bitbucket.stream.DseSparkKafkaBaseJob.getStreamingContext(DseSparkKafkaBaseJob.java:445)
at 
com.gopro.dse.bitbucket.stream.DseSparkKafkaBaseJob.doWork(DseSparkKafkaBaseJob.java:392)
 
at 
com.gopro.dse.bitbucket.stream.DseBitBucketStreamMetaProcessorJob.main(DseBitBucketStreamMetaProcessorJob.java:355)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
at java.lang.reflect.Method.invoke(Method.java:606) 
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) 
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.ClassNotFoundException: 
org.apache.spark.streaming.kafka.OffsetRange 
at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:425) 
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:358) 
at java.lang.Class.forName0(Native Method) 
at java.lang.Class.forName(Class.java:274) 
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625) 
at 
org.apache.spark.streaming.ObjectInputStreamWithLoader.resolveClass(Checkpoint.scala:279)
 
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) 
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) 
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1663) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) 
at 
org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:188)
 
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1137) 
... 34 more 
{noformat}

When I apply the one line patch from this bug to the Spark 1.3 code base that 
our distribution includes, the exception goes away.  So the patch in this bug 
definitely resolves the issue described here in this bug.  I am working with 
our Spark distribution provider, Cloudera, to include this fix into their next 
patch.

My question here is whether there is a workaround for this issue besides the 
code fix?  I tried setting the "-Dsun.lang.ClassLoader.allowArraySyntax=true" 
Java option as mentioned in the last comment and that didn't resolve it.  I was 
just wondering if I missed something obvious before we go patching a rather old 
release of Spark (1.3).

> Checkpoints cannot reference classes defined outside of Spark's assembly
> ------------------------------------------------------------------------
>
>                 Key: SPARK-5569
>                 URL: https://issues.apache.org/jira/browse/SPARK-5569
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: Patrick Wendell
>
> Not sure if this is a bug or a feature, but it's not obvious, so wanted to 
> create a JIRA to make sure we document this behavior.
> First documented by Cody Koeninger:
> https://gist.github.com/koeninger/561a61482cd1b5b3600c
> {code}
> 15/01/12 16:07:07 INFO CheckpointReader: Attempting to load checkpoint from 
> file file:/var/tmp/cp/checkpoint-1421100410000.bk
> 15/01/12 16:07:07 WARN CheckpointReader: Error reading checkpoint from file 
> file:/var/tmp/cp/checkpoint-1421100410000.bk
> java.io.IOException: java.lang.ClassNotFoundException: 
> org.apache.spark.rdd.kafka.KafkaRDDPartition
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1043)
>         at 
> org.apache.spark.streaming.dstream.DStreamCheckpointData.readObject(DStreamCheckpointData.scala:146)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
>         at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:180)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040)
>         at 
> org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>         at 
> org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:251)
>         at 
> org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:239)
>         at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at 
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>         at 
> org.apache.spark.streaming.CheckpointReader$.read(Checkpoint.scala:239)
>         at 
> org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:552)
>         at example.CheckpointedExample$.main(CheckpointedExample.scala:34)
>         at example.CheckpointedExample.main(CheckpointedExample.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:365)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.spark.rdd.kafka.KafkaRDDPartition
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:274)
>         at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
>         at 
> org.apache.spark.streaming.ObjectInputStreamWithLoader.resolveClass(Checkpoint.scala:279)
>         at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>         at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>         at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1663)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>         at 
> scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
>         at 
> scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
>         at scala.collection.mutable.HashTable$class.init(HashTable.scala:105)
>         at scala.collection.mutable.HashMap.init(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.readObject(HashMap.scala:142)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
>         at 
> org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$readObject$1.apply$mcV$sp(DStreamCheckpointData.scala:148)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040)
>         ... 52 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to