Chandramouli Muthukumaran created SPARK-21844:
-------------------------------------------------
Summary: Checkpointing issue in Spark Streaming involving
Dataframes
Key: SPARK-21844
URL: https://issues.apache.org/jira/browse/SPARK-21844
Project: Spark
Issue Type: Bug
Components: DStreams
Affects Versions: 2.1.0
Environment: Spark 2.1.0 , Kafka 0.10
Reporter: Chandramouli Muthukumaran
I have started spark streaming recently and implementing checkpoint. I'm
storing the checkpoint in HDFS. when the streaming failed it's able to go back
to the last checkpoint but getting NullPointerException and the streaming job
is getting killed. I'm able to see the checkpoints in HDFS. Not sure why I'm
getting the exception even though there is chckpoint in HDFS. Any inputs will
be helpful. Not sure if it is a bug
{code:java}
package ca.twitter2
import org.apache.kafka.clients._
import org.apache.kafka._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.log4j._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.KafkaUtils
import java.util.HashMap
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}
object NGINXLogProcessingWindowedwithcheckpointv2 {
case class AccessLog(Datetime: String, requesterip: String, httpcode:
String, method: String, serverip2: String, responsetime: String, operation:
String, application: String)
val checkpointDir =
"hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint6"
val WINDOW_LENGTH = Seconds(43200)
val SLIDE_INTERVAL = Seconds(120)
def creatingFunc(): StreamingContext = {
println("Creating new context")
val sparkConf = new
SparkConf().setAppName("NGINXLogAnalysiswindowedwithcheckpoint")
.setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(120))
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
//val checkpointDir =
"hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint"
ssc.checkpoint(checkpointDir)
val spark = SparkSession
.builder()
.getOrCreate()
val topics = List("REST").toSet
// Logger.getLogger("org").setLevel(Level.ERROR)
//Logger.getLogger("akka").setLevel(Level.ERROR)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "10.24.18.36:6667",
//"bootstrap.servers" -> "10.71.52.119:9092",
// "bootstrap.servers" -> "192.168.123.36:6667",
"group.id" -> "2",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
->"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringDeserializer",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// Create the direct stream with the Kafka parameters and topics
val consumerStrategy = ConsumerStrategies.Subscribe[String,
String](topics, kafkaParams)
val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent, consumerStrategy)
//kafkaStream.checkpoint(Seconds(600))
val lines = kafkaStream.map(_.value()).repartition(4)
val lineswindowed =lines.window(WINDOW_LENGTH, SLIDE_INTERVAL)
val lines2= lineswindowed.map(_.split(","))
val lines4slide= lines2.map(p =>
AccessLog(p(0),p(2).toString,p(4).toString,p(3).toString, p(8).toString,
p(7).toString, p(10), p(12)))
lines4slide.foreachRDD { rdd2 =>
if (!rdd2.isEmpty) {
val count = rdd2.count
println("count received " + count)
import org.apache.spark.sql.functions._
import spark.implicits._
rdd2.count
rdd2.checkpoint
val LogDF = rdd2.toDF()
LogDF.createOrReplaceTempView("Log")
val LogDFslide =
LogDF.select($"Datetime",$"requesterip".cast("string"),$"httpcode",expr("(split(method,
' '))[1]").cast("string").as("request"),expr("(split(method, '
'))[2]").cast("string").as("webserviceurl"),expr("(split(method, '
'))[3]").cast("string").as("protocol"), $"serverip2",
$"responsetime",expr("(split(operation,
'/'))[4]").cast("string").as("operationtype"), $"application".cast("string"))
LogDFslide.createOrReplaceTempView("LogDFslide")
//LogDFslide.printSchema()
//LogDFslide.show
val Log2DFslide = spark.sql("SELECT
Datetime,requesterip,httpcode, substring(request,2,length(request))as
request2,webserviceurl, protocol, serverip2, split(webserviceurl, '/')[3] as
webservice3, responsetime, substring(operationtype,1,length(operationtype)-4)
as httpsoapaction, application FROM LogDFslide")
Log2DFslide.createOrReplaceTempView("Log2DFslide")
val Log2DFslideoutput = spark.sql("SELECT
Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2,
split(webservice3, '[?]')[0] as webservice, responsetime, httpsoapaction,
application FROM Log2DFslide") // Log2DFslide.show
//println("printing line3")
//Log2DFslideoutput.show
//
Log2DFslideoutput.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogWindowedcheckpointed");
val log2DFFilter = spark.sql("SELECT
Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2,
split(webservice3, '[?]')[0] as webservice2, responsetime, httpsoapaction,
application from Log2DFslide where responsetime <>'-' and responsetime <>'' ")
log2DFFilter.createOrReplaceTempView("log2DFFilter")
//log2DFFilter.printSchema()
log2DFFilter.show
val Log3DFslide = spark.sql( "Select initcap(webservice2)
as webservice, round(avg(responsetime),4) as Averageresponsetime from
log2DFFilter where webservice2 <>'' group by initcap(webservice2) ")
// val Log3DFslide =
log2DFFilter.select(expr("initcap(webservice2)"),
expr("round(avg(responsetime),4)").as("Averageresponsetime")
).groupBy(expr("initcap(webservice2)"))
// Log3DFslide.printSchema()
Log3DFslide.createOrReplaceTempView("Log3DFslide")
Log3DFslide.show
//Log3DFslide.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogstatistics");
}
}
ssc
}
def main(args: Array[String]) {
val context = StreamingContext.getActiveOrCreate(checkpointDir,
creatingFunc _)
//val ssc = StreamingContext.getOrCreate(checkpointDir,() => {
creatingFunc(checkpointDir) })
context.start()
context.awaitTermination()
}
}
{code}
I get the following error@
{code:java}
17/08/26 13:41:00 ERROR JobScheduler: Error running job streaming job
1503776400000 ms.0
java.lang.NullPointerException
at
org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:165)
at
ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:69)
at
ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:60)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Exception in thread "main" java.lang.NullPointerException
at
org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:165)
at
ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:69)
at
ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:60)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
{code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]