[
https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14486737#comment-14486737
]
yangping wu commented on SPARK-6770:
------------------------------------
Hi Saisai Shao, Thank you for you reply. I've tried to put my streaming
related logic into the function <tt>functionToCreateContext</tt>, as follow:
{code}
def functionToCreateContext() = {
val sparkConf = new SparkConf().setAppName("channelAnalyser")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint("/tmp/kafka/test/offset")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val test = Set("test")
val struct = StructType(StructField("log", StringType) ::Nil)
val kafkaParams = Map[String, String]("metadata.broker.list" ->
"192.168.100.11:9092,192.168.100.12:9092,192.168.100.13:9092",
"group.id" -> "test-consumer-group111")
val url =
"jdbc:mysql://192.168.100.10:3306/spark?user=admin&password=123456&useUnicode=true&characterEncoding=utf8&autoReconnect=true"
val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, test)
SDB.foreachRDD(rdd => {
val result = rdd.map(item => {
item._2 match {
case e: String => Row.apply(e)
case _ => Row.apply("")
}
})
try {
println(result.count())
val df = sqlContext.createDataFrame(result, struct)
df.insertIntoJDBC(url, "testTable", overwrite = false)
} catch {
case e: Exception => e.printStackTrace()
}
})
ssc
}
val ssc = StreamingContext.getOrCreate("/tmp/kafka/test/offset",
functionToCreateContext)
ssc.start()
{code}
But when I recovery the program from checkpoint, I encountered an exception:
{code}
java.lang.NullPointerException
at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:217)
at
org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:191)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:381)
at
logstatstreaming.FlightSearchTodb$$anonfun$logstatstreaming$FlightSearchTodb$$functionToCreateContext$1$1.apply(FlightSearchTodb.scala:57)
at
logstatstreaming.FlightSearchTodb$$anonfun$logstatstreaming$FlightSearchTodb$$functionToCreateContext$1$1.apply(FlightSearchTodb.scala:40)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
{code}
It seems to be the SQLContext has not been initialized, so the <tt>settings<tt>
is not initialized in the <tt>org.apache.spark.sql.SQLConf</tt>. then
{code}
private[spark] def dataFrameEagerAnalysis: Boolean =
getConf(DATAFRAME_EAGER_ANALYSIS, "true").toBoolean
{code}
throw java.lang.NullPointerException.
> DirectKafkaInputDStream has not been initialized when recovery from checkpoint
> ------------------------------------------------------------------------------
>
> Key: SPARK-6770
> URL: https://issues.apache.org/jira/browse/SPARK-6770
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.3.0
> Reporter: yangping wu
>
> I am read data from kafka using createDirectStream method and save the
> received log to Mysql, the code snippets as follows
> {code}
> def functionToCreateContext(): StreamingContext = {
> val sparkConf = new SparkConf()
> val sc = new SparkContext(sparkConf)
> val ssc = new StreamingContext(sc, Seconds(10))
> ssc.checkpoint("/tmp/kafka/channel/offset") // set checkpoint directory
> ssc
> }
> val struct = StructType(StructField("log", StringType) ::Nil)
> // Get StreamingContext from checkpoint data or create a new one
> val ssc = StreamingContext.getOrCreate("/tmp/kafka/channel/offset",
> functionToCreateContext)
> val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topics)
> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
> SDB.foreachRDD(rdd => {
> val result = rdd.map(item => {
> println(item)
> val result = item._2 match {
> case e: String => Row.apply(e)
> case _ => Row.apply("")
> }
> result
> })
> println(result.count())
> val df = sqlContext.createDataFrame(result, struct)
> df.insertIntoJDBC(url, "test", overwrite = false)
> })
> ssc.start()
> ssc.awaitTermination()
> ssc.stop()
> {code}
> But when I recovery the program from checkpoint, I encountered an exception:
> {code}
> Exception in thread "main" org.apache.spark.SparkException:
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not
> been initialized
> at
> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
> at
> org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89)
> at
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
> at
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
> at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57)
> at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> {code}
> Not sure if this is a bug or a feature, but it's not obvious, so wanted to
> create a JIRA to make sure we document this behavior.Is someone can help me
> to see the reasons? Thank you.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]