[ 
https://issues.apache.org/jira/browse/SPARK-21844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-21844.
-------------------------------
    Resolution: Invalid

"Why isn't this working" questions are better for StackOverflow or the mailing 
list.

> Checkpointing issue in Spark Streaming involving Dataframes
> -----------------------------------------------------------
>
>                 Key: SPARK-21844
>                 URL: https://issues.apache.org/jira/browse/SPARK-21844
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.1.0
>         Environment: Spark 2.1.0 , Kafka  0.10
>            Reporter: Chandramouli Muthukumaran
>
> I have started spark streaming recently and implementing checkpoint. I'm 
> storing the checkpoint in HDFS. when the streaming failed it's able to go 
> back to the last checkpoint but getting NullPointerException and the 
> streaming job is getting killed. I'm able to see the checkpoints in HDFS. Not 
> sure why I'm getting the exception even though there is chckpoint in HDFS. 
> Any inputs will be helpful. Not sure if it is a bug
> {code:java}
> package ca.twitter2
> import org.apache.kafka.clients._
> import org.apache.kafka._
> import org.apache.kafka.clients.consumer.ConsumerConfig
> import org.apache.kafka.clients._
> import org.apache.spark._
> import org.apache.spark.streaming._
> import org.apache.log4j._
> import org.apache.spark.streaming.kafka010._
> import org.apache.spark.streaming.kafka010.KafkaUtils
> import java.util.HashMap
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> import org.apache.log4j.{Level, Logger}
> object NGINXLogProcessingWindowedwithcheckpointv2 {    
>      case class AccessLog(Datetime: String, requesterip: String, httpcode: 
> String, method: String, serverip2: String, responsetime: String, operation: 
> String, application: String)
>       val checkpointDir = 
> "hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint6"
>       val WINDOW_LENGTH = Seconds(43200)
>       val SLIDE_INTERVAL = Seconds(120)
>       
>    def creatingFunc(): StreamingContext = {
>    println("Creating new context")
>    val sparkConf = new 
> SparkConf().setAppName("NGINXLogAnalysiswindowedwithcheckpoint")
>    
>   .setMaster("local")
>    val ssc = new StreamingContext(sparkConf, Seconds(120))
>     sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
>    //val checkpointDir = 
> "hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint"
>           
>         ssc.checkpoint(checkpointDir)
>               
>        val spark = SparkSession
>                    .builder()                  
>                    .getOrCreate()
>                    
>        val topics = List("REST").toSet
>        // Logger.getLogger("org").setLevel(Level.ERROR)
>         //Logger.getLogger("akka").setLevel(Level.ERROR)
>        val kafkaParams = Map[String, Object](
>                          "bootstrap.servers" -> "10.24.18.36:6667",
>                           //"bootstrap.servers" -> "10.71.52.119:9092",
>           // "bootstrap.servers" -> "192.168.123.36:6667",
>                          "group.id" -> "2",
>                           ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG 
> ->"org.apache.kafka.common.serialization.StringDeserializer",
>                           ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
> "org.apache.kafka.common.serialization.StringDeserializer",
>                           "auto.offset.reset" -> "latest",
>                           "enable.auto.commit" -> (false: java.lang.Boolean)
>                                              )
>    
>                // Create the direct stream with the Kafka parameters and 
> topics
>       val consumerStrategy = ConsumerStrategies.Subscribe[String, 
> String](topics, kafkaParams)
>       val  kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, 
> LocationStrategies.PreferConsistent, consumerStrategy)
>           //kafkaStream.checkpoint(Seconds(600))
>       val lines = kafkaStream.map(_.value()).repartition(4)
>       val lineswindowed =lines.window(WINDOW_LENGTH, SLIDE_INTERVAL)
>       val lines2= lineswindowed.map(_.split(","))
>       val lines4slide= lines2.map(p => 
> AccessLog(p(0),p(2).toString,p(4).toString,p(3).toString, p(8).toString, 
> p(7).toString, p(10), p(12)))
>             lines4slide.foreachRDD { rdd2 =>
>                if (!rdd2.isEmpty) {
>                     val count = rdd2.count
>                     println("count received " + count)
>                     import org.apache.spark.sql.functions._
>                     import spark.implicits._
>                     rdd2.count
>                    rdd2.checkpoint   
>                     
>                     val LogDF = rdd2.toDF()
>                                          LogDF.createOrReplaceTempView("Log")
>                     val LogDFslide = 
> LogDF.select($"Datetime",$"requesterip".cast("string"),$"httpcode",expr("(split(method,
>  ' '))[1]").cast("string").as("request"),expr("(split(method, ' 
> '))[2]").cast("string").as("webserviceurl"),expr("(split(method, ' 
> '))[3]").cast("string").as("protocol"), $"serverip2", 
> $"responsetime",expr("(split(operation, 
> '/'))[4]").cast("string").as("operationtype"), $"application".cast("string"))
>                     LogDFslide.createOrReplaceTempView("LogDFslide")
>                     //LogDFslide.printSchema()
>                     //LogDFslide.show
>                     val Log2DFslide = spark.sql("SELECT 
> Datetime,requesterip,httpcode, substring(request,2,length(request))as 
> request2,webserviceurl, protocol, serverip2, split(webserviceurl, '/')[3] as 
> webservice3, responsetime, substring(operationtype,1,length(operationtype)-4) 
> as httpsoapaction, application FROM LogDFslide")
>                     Log2DFslide.createOrReplaceTempView("Log2DFslide")
>                     val Log2DFslideoutput = spark.sql("SELECT 
> Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, 
> split(webservice3, '[?]')[0] as webservice, responsetime, httpsoapaction, 
> application FROM Log2DFslide")                    // Log2DFslide.show   
>                     //println("printing line3")
>                     //Log2DFslideoutput.show
>                    // 
> Log2DFslideoutput.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogWindowedcheckpointed");
>                     val   log2DFFilter = spark.sql("SELECT 
> Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, 
> split(webservice3, '[?]')[0] as webservice2, responsetime, httpsoapaction, 
> application from  Log2DFslide where responsetime <>'-' and responsetime <>'' 
> ")  
>                     log2DFFilter.createOrReplaceTempView("log2DFFilter")
>                     //log2DFFilter.printSchema()
>                     log2DFFilter.show
>                     val Log3DFslide = spark.sql( "Select initcap(webservice2) 
> as webservice, round(avg(responsetime),4) as Averageresponsetime from 
> log2DFFilter where webservice2 <>'' group by initcap(webservice2) ")
>                     // val  Log3DFslide =  
> log2DFFilter.select(expr("initcap(webservice2)"), 
> expr("round(avg(responsetime),4)").as("Averageresponsetime")  
> ).groupBy(expr("initcap(webservice2)"))
>                 
>                     // Log3DFslide.printSchema()
>                     Log3DFslide.createOrReplaceTempView("Log3DFslide")
>                             
>                      Log3DFslide.show
>                     
> //Log3DFslide.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogstatistics");
>                            
>                                  }
>                
>                                   } 
>                 
>                                                                 ssc
>    
>       } 
>        def main(args: Array[String]) { 
>     
>     
>         val context = StreamingContext.getActiveOrCreate(checkpointDir, 
> creatingFunc _)
>    
>         //val ssc = StreamingContext.getOrCreate(checkpointDir,() => { 
> creatingFunc(checkpointDir) })
>         context.start() 
>         context.awaitTermination() 
>      }
>         
>       
> }
> {code}
> I get the following error@
> {code:java}
> 17/08/26 13:41:00 ERROR JobScheduler: Error running job streaming job 
> 1503776400000 ms.0
> java.lang.NullPointerException
>       at 
> org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:165)
>       at 
> ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:69)
>       at 
> ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:60)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
>       at scala.util.Try$.apply(Try.scala:192)
>       at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
>       at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>       at java.lang.Thread.run(Unknown Source)
> Exception in thread "main" java.lang.NullPointerException
>       at 
> org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:165)
>       at 
> ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:69)
>       at 
> ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:60)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
>       at scala.util.Try$.apply(Try.scala:192)
>       at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
>       at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>       at java.lang.Thread.run(Unknown Source)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to