fangfengbin created SPARK-20436:
-----------------------------------
Summary: NullPointerException when restart from checkpoint file
Key: SPARK-20436
URL: https://issues.apache.org/jira/browse/SPARK-20436
Project: Spark
Issue Type: Bug
Components: DStreams
Affects Versions: 1.5.0
Reporter: fangfengbin
I have written a Spark Streaming application which have two DStreams.
Code is :
object KafkaTwoInkfk {
def main(args: Array[String]) {
val Array(checkPointDir, brokers, topic1, topic2, batchSize) = args
val ssc = StreamingContext.getOrCreate(checkPointDir, () =>
createContext(args))
ssc.start()
ssc.awaitTermination()
}
def createContext(args : Array[String]) : StreamingContext = {
val Array(checkPointDir, brokers, topic1, topic2, batchSize) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(batchSize.toLong))
ssc.checkpoint(checkPointDir)
val topicArr1 = topic1.split(",")
val topicSet1 = topicArr1.toSet
val topicArr2 = topic2.split(",")
val topicSet2 = topicArr2.toSet
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers
)
val lines1 = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topicSet1)
val words1 = lines1.map(_._2).flatMap(_.split(" "))
val wordCounts1 = words1.map(x => {
(x, 1L)}).reduceByKey(_ + _)
wordCounts1.print()
val lines2 = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topicSet2)
val words2 = lines1.map(_._2).flatMap(_.split(" "))
val wordCounts2 = words2.map(x => {
(x, 1L)}).reduceByKey(_ + _)
wordCounts2.print()
return ssc
}
}
when restart from checkpoint file, it throw NullPointerException:
java.lang.NullPointerException
at
org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply$mcV$sp(DStreamCheckpointData.scala:126)
at
org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:124)
at
org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:124)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1291)
at
org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DStreamCheckpointData.scala:124)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:528)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:523)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:523)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1291)
at
org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:523)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:190)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:185)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:185)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1291)
at
org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:185)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:145)
at
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:145)
at
org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:145)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1325)
at
org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:146)
at
org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:524)
at
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:572)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:571)
at com.spark.test.KafkaTwoInkfk$.main(KafkaTwoInkfk.scala:21)
at com.spark.test.KafkaTwoInkfk.main(KafkaTwoInkfk.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:760)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:190)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:215)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:129)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]