[
https://issues.apache.org/jira/browse/SPARK-21844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sean Owen resolved SPARK-21844.
-------------------------------
Resolution: Invalid
"Why isn't this working" questions are better for StackOverflow or the mailing
list.
> Checkpointing issue in Spark Streaming involving Dataframes
> -----------------------------------------------------------
>
> Key: SPARK-21844
> URL: https://issues.apache.org/jira/browse/SPARK-21844
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.1.0
> Environment: Spark 2.1.0 , Kafka 0.10
> Reporter: Chandramouli Muthukumaran
>
> I have started spark streaming recently and implementing checkpoint. I'm
> storing the checkpoint in HDFS. when the streaming failed it's able to go
> back to the last checkpoint but getting NullPointerException and the
> streaming job is getting killed. I'm able to see the checkpoints in HDFS. Not
> sure why I'm getting the exception even though there is chckpoint in HDFS.
> Any inputs will be helpful. Not sure if it is a bug
> {code:java}
> package ca.twitter2
> import org.apache.kafka.clients._
> import org.apache.kafka._
> import org.apache.kafka.clients.consumer.ConsumerConfig
> import org.apache.kafka.clients._
> import org.apache.spark._
> import org.apache.spark.streaming._
> import org.apache.log4j._
> import org.apache.spark.streaming.kafka010._
> import org.apache.spark.streaming.kafka010.KafkaUtils
> import java.util.HashMap
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> import org.apache.log4j.{Level, Logger}
> object NGINXLogProcessingWindowedwithcheckpointv2 {
> case class AccessLog(Datetime: String, requesterip: String, httpcode:
> String, method: String, serverip2: String, responsetime: String, operation:
> String, application: String)
> val checkpointDir =
> "hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint6"
> val WINDOW_LENGTH = Seconds(43200)
> val SLIDE_INTERVAL = Seconds(120)
>
> def creatingFunc(): StreamingContext = {
> println("Creating new context")
> val sparkConf = new
> SparkConf().setAppName("NGINXLogAnalysiswindowedwithcheckpoint")
>
> .setMaster("local")
> val ssc = new StreamingContext(sparkConf, Seconds(120))
> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
> //val checkpointDir =
> "hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint"
>
> ssc.checkpoint(checkpointDir)
>
> val spark = SparkSession
> .builder()
> .getOrCreate()
>
> val topics = List("REST").toSet
> // Logger.getLogger("org").setLevel(Level.ERROR)
> //Logger.getLogger("akka").setLevel(Level.ERROR)
> val kafkaParams = Map[String, Object](
> "bootstrap.servers" -> "10.24.18.36:6667",
> //"bootstrap.servers" -> "10.71.52.119:9092",
> // "bootstrap.servers" -> "192.168.123.36:6667",
> "group.id" -> "2",
> ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
> ->"org.apache.kafka.common.serialization.StringDeserializer",
> ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
> "org.apache.kafka.common.serialization.StringDeserializer",
> "auto.offset.reset" -> "latest",
> "enable.auto.commit" -> (false: java.lang.Boolean)
> )
>
> // Create the direct stream with the Kafka parameters and
> topics
> val consumerStrategy = ConsumerStrategies.Subscribe[String,
> String](topics, kafkaParams)
> val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc,
> LocationStrategies.PreferConsistent, consumerStrategy)
> //kafkaStream.checkpoint(Seconds(600))
> val lines = kafkaStream.map(_.value()).repartition(4)
> val lineswindowed =lines.window(WINDOW_LENGTH, SLIDE_INTERVAL)
> val lines2= lineswindowed.map(_.split(","))
> val lines4slide= lines2.map(p =>
> AccessLog(p(0),p(2).toString,p(4).toString,p(3).toString, p(8).toString,
> p(7).toString, p(10), p(12)))
> lines4slide.foreachRDD { rdd2 =>
> if (!rdd2.isEmpty) {
> val count = rdd2.count
> println("count received " + count)
> import org.apache.spark.sql.functions._
> import spark.implicits._
> rdd2.count
> rdd2.checkpoint
>
> val LogDF = rdd2.toDF()
> LogDF.createOrReplaceTempView("Log")
> val LogDFslide =
> LogDF.select($"Datetime",$"requesterip".cast("string"),$"httpcode",expr("(split(method,
> ' '))[1]").cast("string").as("request"),expr("(split(method, '
> '))[2]").cast("string").as("webserviceurl"),expr("(split(method, '
> '))[3]").cast("string").as("protocol"), $"serverip2",
> $"responsetime",expr("(split(operation,
> '/'))[4]").cast("string").as("operationtype"), $"application".cast("string"))
> LogDFslide.createOrReplaceTempView("LogDFslide")
> //LogDFslide.printSchema()
> //LogDFslide.show
> val Log2DFslide = spark.sql("SELECT
> Datetime,requesterip,httpcode, substring(request,2,length(request))as
> request2,webserviceurl, protocol, serverip2, split(webserviceurl, '/')[3] as
> webservice3, responsetime, substring(operationtype,1,length(operationtype)-4)
> as httpsoapaction, application FROM LogDFslide")
> Log2DFslide.createOrReplaceTempView("Log2DFslide")
> val Log2DFslideoutput = spark.sql("SELECT
> Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2,
> split(webservice3, '[?]')[0] as webservice, responsetime, httpsoapaction,
> application FROM Log2DFslide") // Log2DFslide.show
> //println("printing line3")
> //Log2DFslideoutput.show
> //
> Log2DFslideoutput.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogWindowedcheckpointed");
> val log2DFFilter = spark.sql("SELECT
> Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2,
> split(webservice3, '[?]')[0] as webservice2, responsetime, httpsoapaction,
> application from Log2DFslide where responsetime <>'-' and responsetime <>''
> ")
> log2DFFilter.createOrReplaceTempView("log2DFFilter")
> //log2DFFilter.printSchema()
> log2DFFilter.show
> val Log3DFslide = spark.sql( "Select initcap(webservice2)
> as webservice, round(avg(responsetime),4) as Averageresponsetime from
> log2DFFilter where webservice2 <>'' group by initcap(webservice2) ")
> // val Log3DFslide =
> log2DFFilter.select(expr("initcap(webservice2)"),
> expr("round(avg(responsetime),4)").as("Averageresponsetime")
> ).groupBy(expr("initcap(webservice2)"))
>
> // Log3DFslide.printSchema()
> Log3DFslide.createOrReplaceTempView("Log3DFslide")
>
> Log3DFslide.show
>
> //Log3DFslide.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogstatistics");
>
> }
>
> }
>
> ssc
>
> }
> def main(args: Array[String]) {
>
>
> val context = StreamingContext.getActiveOrCreate(checkpointDir,
> creatingFunc _)
>
> //val ssc = StreamingContext.getOrCreate(checkpointDir,() => {
> creatingFunc(checkpointDir) })
> context.start()
> context.awaitTermination()
> }
>
>
> }
> {code}
> I get the following error@
> {code:java}
> 17/08/26 13:41:00 ERROR JobScheduler: Error running job streaming job
> 1503776400000 ms.0
> java.lang.NullPointerException
> at
> org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:165)
> at
> ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:69)
> at
> ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:60)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at scala.util.Try$.apply(Try.scala:192)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> at java.lang.Thread.run(Unknown Source)
> Exception in thread "main" java.lang.NullPointerException
> at
> org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:165)
> at
> ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:69)
> at
> ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:60)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at scala.util.Try$.apply(Try.scala:192)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> at java.lang.Thread.run(Unknown Source)
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]