[
https://issues.apache.org/jira/browse/SPARK-16480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15371401#comment-15371401
]
Stavros Kontopoulos edited comment on SPARK-16480 at 7/11/16 6:55 PM:
----------------------------------------------------------------------
>From the logs above in the SIGTERM case the shutdownhook here is called:
>https://github.com/apache/spark/blob/v1.6.1/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala#L734.
>
So i would expect the job to exit fine according to the old PR here:
https://issues.apache.org/jira/browse/SPARK-7776.
Why should i define my own, i guess that was before that PR or not?
Internally spark uses its own ShutdownHookManager:
https://github.com/apache/spark/blob/v1.6.1/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala#L32
but at the end delegates to org.apache.hadoop.util.ShutdownHookManager:
https://github.com/apache/spark/blob/v1.6.1/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala#L220.
According to the comments there it should work better call because it probably
respects order of hook calls while jvm fires them in parallel not in compliance
with their registration order. At the end though internally it just uses the
same Runtime.getRuntime().addShutdownHook call. The same call is used by
Scala's sys.addShutdownHook.
To summarize that point if signals are caught from the jvm i would expect the
shutdownhooks to stop things gracefully and that is a bug according the
description of this issue, correct? I mean maybe the code in
FileBasedWriteAheadLogReader needs to handle
signal scenarios or spark in general needs to either support such cases or not.
Moreover i am not so sure things will work across envs even if shutdownhooks in
general are caught most of the time.
Using this script here: https://gist.github.com/nicoulaj/3758045 i see INT is
not supported as i said for:
java version "1.8.0_91"
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)
on my local machine and that script actually uses sun.misc.Signal which i am
not sure if it is deprecated or not.
Another weird thing is the same job is reported to work with mapWithState but
not with updateStateByKey.
Thanx for pointing to the code you have.
I will try a few things and i will let you know, specifically also define my
shutdownhook although the one defined by the old PR was called.
was (Author: skonto):
>From the logs above in the SIGTERM case the shutdownhook here is called:
>https://github.com/apache/spark/blob/v1.6.1/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala#L734.
>
So i would expect the job to exit fine according to the old PR here:
https://issues.apache.org/jira/browse/SPARK-7776.
Why i should define my own correct, i guess that was before that PR or not?
Internally spark uses its own ShutdownHookManager:
https://github.com/apache/spark/blob/v1.6.1/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala#L32
but at the end delegates to org.apache.hadoop.util.ShutdownHookManager:
https://github.com/apache/spark/blob/v1.6.1/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala#L220.
According to the comments there it should work better call because it probably
respects order of hook calls while jvm fires them in parallel not in compliance
with their registration order. At the end though internally it just uses the
same Runtime.getRuntime().addShutdownHook call. The same call is used by
Scala's sys.addShutdownHook.
To summarize that point if signals are caught from the jvm i would expect the
shutdownhooks to stop things gracefully and that is a bug according the
description of this issue, correct? I mean maybe the code in
FileBasedWriteAheadLogReader needs to handle
signal scenarios or spark in general needs to either support such cases or not.
Moreover i am not so sure things will work across envs even if shutdownhooks in
general are caught most of the time.
Using this script here: https://gist.github.com/nicoulaj/3758045 i see INT is
not supported as i said for:
java version "1.8.0_91"
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)
on my local machine and that script actually uses sun.misc.Signal which i am
not sure if it is deprecated or not.
Another weird thing is the same job is reported to work with mapWithState but
not with updateStateByKey.
Thanx for pointing to the code you have.
I will try a few things and i will let you know, specifically also define my
shutdownhook although the one defined by the old PR was called.
> Streaming checkpointing does not work well with SIGTERM
> -------------------------------------------------------
>
> Key: SPARK-16480
> URL: https://issues.apache.org/jira/browse/SPARK-16480
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.6.1
> Reporter: Stavros Kontopoulos
>
> A customer gets the following exception when tries to stop gracefully a
> streaming job with SIGTERM:
> {quote}
> org.apache.spark.SparkException: RDD transformations and actions can only be
> invoked by the driver, not inside of other transformations; for example,
> rdd1.map(x => rdd2.values.count() * x) is invalid because the values
> transformation and count action cannot be performed inside of the rdd1.map
> transformation. For more information, see SPARK-5063.
> at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> {quote}
> This obviously implies invalid state for checkpointing data. This can be
> reproduced easily with the following code skeleton (Kafka direct stream is
> used):
> {quote}
> val dataStream = myKafkaDirectStream.
> .mapWithState(stateSpec).stateSnapshots().foreachRDD { data =>
> // back up the state
> data.cache()
> data.collect().foreach(println)
> data.map { x => x._1 + "," + x._2 }.saveAsTextFile(inputPath)
> ..
> }
> {quote}
> inputPath above is both the initialRdd and the checkpointing dir (using a
> different path does not affect the issue).
> Streaming context is created correctly with getOrCreate and all
> transformations and actions are put in
> that function.
> In order to reproduce you just run the job (1st run), stop it with kill -15
> ... and then restart it (2nd run).
> To reproduce the issue you can use an empty local folder for the "inputPath"
> the checkpointing path.
> The checkpointing blocks from the first run:
> {quote}
> 16/07/11 13:59:51 DEBUG DirectKafkaInputDStream: Updated checkpoint data for
> time 1468234791000 ms: [
> 4 checkpoint files
> 1468234791000 ms -> [Lscala.Tuple4;@60a55c45
> 1468234790000 ms -> [Lscala.Tuple4;@58e5be3
> 1468234789000 ms -> [Lscala.Tuple4;@13cf6be7
> 1468234788000 ms -> [Lscala.Tuple4;@6017d6ae
> ]
> 16/07/11 13:59:51 DEBUG InternalMapWithStateDStream: Updated checkpoint data
> for time 1468234791000 ms: [
> 0 checkpoint files
> ]
> 16/07/11 13:59:51 DEBUG FlatMappedDStream: Updated checkpoint data for time
> 1468234791000 ms: [
> 0 checkpoint files
> ]
> 16/07/11 13:59:51 DEBUG ForEachDStream: Updated checkpoint data for time
> 1468234791000 ms: [
> 0 checkpoint files
> ]
> {quote}
> In the first run i successfully see:
> {quote}
> 16/07/11 13:59:49 INFO StreamingContext: Invoking stop(stopGracefully=true)
> from shutdown hook
> {quote}
> Here is the log output from the second run, when the recovery occurs from the
> checkpointing data:
> {quote}
> 16/07/11 14:00:02 DEBUG FileBasedWriteAheadLogReader: Error reading next
> item, EOF reached
> java.io.EOFException
> at java.io.DataInputStream.readInt(DataInputStream.java:392)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47)
> at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at
> scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:432)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:212)
> at
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:210)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.recoverPastEvents(ReceivedBlockTracker.scala:210)
> at
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.<init>(ReceivedBlockTracker.scala:81)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker.<init>(ReceiverTracker.scala:106)
> at
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:80)
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:610)
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
> at org.apache.spark.util.ThreadUtils$$anon$1.run(ThreadUtils.scala:122)
> 16/07/11 14:00:02 INFO JobGenerator: Batches during down time (12 batches):
> 1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms,
> 1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms,
> 1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
> 16/07/11 14:00:02 INFO JobGenerator: Batches pending processing (0 batches):
> 16/07/11 14:00:02 INFO JobGenerator: Batches to reschedule (12 batches):
> 1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms,
> 1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms,
> 1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
> 16/07/11 14:00:02 DEBUG DStreamGraph: Generating jobs for time 1468234791000
> ms
> 16/07/11 14:00:02 DEBUG FlatMappedDStream: Time 1468234791000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234791000 ms is
> valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234790000 ms is
> valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234789000 ms is
> valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234788000 ms is
> valid
> 16/07/11 14:00:02 INFO InternalMapWithStateDStream: Time 1468234787000 ms is
> invalid as zeroTime is 1468234787000 ms and slideDuration is 1000 ms and
> difference is 0 ms
> 16/07/11 14:00:02 ERROR StreamingContext: Error starting the context, marking
> it as stopped
> org.apache.spark.SparkException: RDD transformations and actions can only be
> invoked by the driver, not inside of other transformations; for example,
> rdd1.map(x => rdd2.values.count() * x) is invalid because the values
> transformation and count action cannot be performed inside of the rdd1.map
> transformation. For more information, see SPARK-5063.
> at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> at
> org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:530)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:189)
> {quote}
> Maybe the first exception in the 2nd run relates to:
> https://mail-archives.apache.org/mod_mbox/spark-user/201509.mbox/%3CCAMz94CGJzCv6yHW1mOTU2QRX=Pdu2k_PCvJ0++JPNM=upfg...@mail.gmail.com%3E
> and causes the issue (not sure what is happening there).
> The issue was reported to happen also when using HDFS.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]