[ 
https://issues.apache.org/jira/browse/SPARK-16480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stavros Kontopoulos updated SPARK-16480:
----------------------------------------
    Description: 
A customer gets the following exception when tries to stop gracefully a 
streaming job with SIGTERM:

{quote}
org.apache.spark.SparkException: RDD transformations and actions can only be 
invoked by the driver, not inside of other transformations; for example, 
rdd1.map(x => rdd2.values.count() * x) is invalid because the values 
transformation and count action cannot be performed inside of the rdd1.map 
transformation. For more information, see SPARK-5063.
        at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
{quote}

This obviously implies invalid state for checkpointing data. This can be 
reproduced easily with the following code skeleton (Kafka direct stream is 
used):

{quote}
val dataStream = myKafkaDirectStream.
  .mapWithState(stateSpec).stateSnapshots().foreachRDD { data =>

  // back up the state
  data.cache()
  data.collect().foreach(println)
  data.map { x => x._1 + "," + x._2 }.saveAsTextFile(inputPath)
..
}
{quote}

inputPath above is both the initialRdd and the checkpointing dir (using a 
different path does not affect the issue).

Streaming context is created correctly with getOrCreate and all transformations 
and actions are put in 
that function.

In order to reproduce you just run the job (1st run), stop it with kill -15 ... 
and then restart it (2nd run).

To reproduce the issue you can use an empty local folder for the "inputPath" 
the checkpointing path.

The checkpointing blocks from the first run:

{quote}
16/07/11 13:59:51 DEBUG DirectKafkaInputDStream: Updated checkpoint data for 
time 1468234791000 ms: [
4 checkpoint files 
1468234791000 ms -> [Lscala.Tuple4;@60a55c45
1468234790000 ms -> [Lscala.Tuple4;@58e5be3
1468234789000 ms -> [Lscala.Tuple4;@13cf6be7
1468234788000 ms -> [Lscala.Tuple4;@6017d6ae
]
16/07/11 13:59:51 DEBUG InternalMapWithStateDStream: Updated checkpoint data 
for time 1468234791000 ms: [
0 checkpoint files 

]
16/07/11 13:59:51 DEBUG FlatMappedDStream: Updated checkpoint data for time 
1468234791000 ms: [
0 checkpoint files 

]
16/07/11 13:59:51 DEBUG ForEachDStream: Updated checkpoint data for time 
1468234791000 ms: [
0 checkpoint files 

]

{quote}
In the first run i successfully see: 

{quote}
16/07/11 13:59:49 INFO StreamingContext: Invoking stop(stopGracefully=true) 
from shutdown hook
{quote}
Here is the log output from the second run, when the recovery occurs from the 
checkpointing data:

{quote}
16/07/11 14:00:02 DEBUG FileBasedWriteAheadLogReader: Error reading next item, 
EOF reached
java.io.EOFException
        at java.io.DataInputStream.readInt(DataInputStream.java:392)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at 
scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:432)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:212)
        at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:210)
        at scala.Option.foreach(Option.scala:236)
        at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.recoverPastEvents(ReceivedBlockTracker.scala:210)
        at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.<init>(ReceivedBlockTracker.scala:81)
        at 
org.apache.spark.streaming.scheduler.ReceiverTracker.<init>(ReceiverTracker.scala:106)
        at 
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:80)
        at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:610)
        at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
        at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
        at org.apache.spark.util.ThreadUtils$$anon$1.run(ThreadUtils.scala:122)
16/07/11 14:00:02 INFO JobGenerator: Batches during down time (12 batches): 
1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 
1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, 
1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
16/07/11 14:00:02 INFO JobGenerator: Batches pending processing (0 batches): 
16/07/11 14:00:02 INFO JobGenerator: Batches to reschedule (12 batches): 
1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 
1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, 
1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
16/07/11 14:00:02 DEBUG DStreamGraph: Generating jobs for time 1468234791000 ms
16/07/11 14:00:02 DEBUG FlatMappedDStream: Time 1468234791000 ms is valid
16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234791000 ms is 
valid
16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234790000 ms is 
valid
16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234789000 ms is 
valid
16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234788000 ms is 
valid
16/07/11 14:00:02 INFO InternalMapWithStateDStream: Time 1468234787000 ms is 
invalid as zeroTime is 1468234787000 ms and slideDuration is 1000 ms and 
difference is 0 ms
16/07/11 14:00:02 ERROR StreamingContext: Error starting the context, marking 
it as stopped
org.apache.spark.SparkException: RDD transformations and actions can only be 
invoked by the driver, not inside of other transformations; for example, 
rdd1.map(x => rdd2.values.count() * x) is invalid because the values 
transformation and count action cannot be performed inside of the rdd1.map 
transformation. For more information, see SPARK-5063.
        at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at 
org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:530)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:189)

{quote}

Maybe the first exception in the 2nd run relates to:

https://mail-archives.apache.org/mod_mbox/spark-user/201509.mbox/%3CCAMz94CGJzCv6yHW1mOTU2QRX=Pdu2k_PCvJ0++JPNM=upfg...@mail.gmail.com%3E
and causes the issue (not sure what is happening there).

The issue was reported to happen also when using HDFS.


  was:
A customer gets the following exception when tries to stop gracefully a 
streaming job with SIGTERM:

org.apache.spark.SparkException: RDD transformations and actions can only be 
invoked by the driver, not inside of other transformations; for example, 
rdd1.map(x => rdd2.values.count() * x) is invalid because the values 
transformation and count action cannot be performed inside of the rdd1.map 
transformation. For more information, see SPARK-5063.
        at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

This implies invalid state for checkpointing data. This can be reproduced 
easily with the following code skeleton (Kafka direct stream is used):

val dataStream = myKafkaDirectStream.
  .mapWithState(stateSpec).stateSnapshots().foreachRDD { data =>

  // back up the state
  data.cache()
  data.collect().foreach(println)
  data.map { x => x._1 + "," + x._2 }.saveAsTextFile(inputPath)
..
}

inputPath is both the initialRdd and the checkpoiting dir (using a different 
path does not affect the issue).

Streaming context is created correctly with getOrCreate and all transformations 
and actions are put in there.
In order to reproduce you just run the job, stop it with kill -15 ... and then 
restart it.
To reproduce you can use an empty local fs folder for the "inputPath" the 
checkpointing folder.

The checkpointing blocks which are updated with the first run:
16/07/11 13:59:51 DEBUG DirectKafkaInputDStream: Updated checkpoint data for 
time 1468234791000 ms: [
4 checkpoint files 
1468234791000 ms -> [Lscala.Tuple4;@60a55c45
1468234790000 ms -> [Lscala.Tuple4;@58e5be3
1468234789000 ms -> [Lscala.Tuple4;@13cf6be7
1468234788000 ms -> [Lscala.Tuple4;@6017d6ae
]
16/07/11 13:59:51 DEBUG InternalMapWithStateDStream: Updated checkpoint data 
for time 1468234791000 ms: [
0 checkpoint files 

]
16/07/11 13:59:51 DEBUG FlatMappedDStream: Updated checkpoint data for time 
1468234791000 ms: [
0 checkpoint files 

]
16/07/11 13:59:51 DEBUG ForEachDStream: Updated checkpoint data for time 
1468234791000 ms: [
0 checkpoint files 

]

The invalid timestamp issue which causes the exception:

16/07/11 14:00:02 DEBUG FileBasedWriteAheadLogReader: Error reading next item, 
EOF reached
java.io.EOFException
        at java.io.DataInputStream.readInt(DataInputStream.java:392)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at 
scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:432)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:212)
        at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:210)
        at scala.Option.foreach(Option.scala:236)
        at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.recoverPastEvents(ReceivedBlockTracker.scala:210)
        at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.<init>(ReceivedBlockTracker.scala:81)
        at 
org.apache.spark.streaming.scheduler.ReceiverTracker.<init>(ReceiverTracker.scala:106)
        at 
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:80)
        at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:610)
        at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
        at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
        at org.apache.spark.util.ThreadUtils$$anon$1.run(ThreadUtils.scala:122)
16/07/11 14:00:02 INFO JobGenerator: Batches during down time (12 batches): 
1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 
1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, 
1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
16/07/11 14:00:02 INFO JobGenerator: Batches pending processing (0 batches): 
16/07/11 14:00:02 INFO JobGenerator: Batches to reschedule (12 batches): 
1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 
1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, 
1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
16/07/11 14:00:02 DEBUG DStreamGraph: Generating jobs for time 1468234791000 ms
16/07/11 14:00:02 DEBUG FlatMappedDStream: Time 1468234791000 ms is valid
16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234791000 ms is 
valid
16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234790000 ms is 
valid
16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234789000 ms is 
valid
16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234788000 ms is 
valid
16/07/11 14:00:02 INFO InternalMapWithStateDStream: Time 1468234787000 ms is 
invalid as zeroTime is 1468234787000 ms and slideDuration is 1000 ms and 
difference is 0 ms
16/07/11 14:00:02 ERROR StreamingContext: Error starting the context, marking 
it as stopped
org.apache.spark.SparkException: RDD transformations and actions can only be 
invoked by the driver, not inside of other transformations; for example, 
rdd1.map(x => rdd2.values.count() * x) is invalid because the values 
transformation and count action cannot be performed inside of the rdd1.map 
transformation. For more information, see SPARK-5063.
        at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at 
org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:530)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:189)

Maybe the first exception relates to:

https://mail-archives.apache.org/mod_mbox/spark-user/201509.mbox/%3CCAMz94CGJzCv6yHW1mOTU2QRX=Pdu2k_PCvJ0++JPNM=upfg...@mail.gmail.com%3E

and causes the issue (not sure what is happening there).

In the first run i successfully see:
16/07/11 13:59:49 INFO StreamingContext: Invoking
stop(stopGracefully=true) from shutdown hook

The issue was reported to happen also when using HDFS.



> Streaming checkpointing does not work well with SIGTERM
> -------------------------------------------------------
>
>                 Key: SPARK-16480
>                 URL: https://issues.apache.org/jira/browse/SPARK-16480
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.6.1
>            Reporter: Stavros Kontopoulos
>
> A customer gets the following exception when tries to stop gracefully a 
> streaming job with SIGTERM:
> {quote}
> org.apache.spark.SparkException: RDD transformations and actions can only be 
> invoked by the driver, not inside of other transformations; for example, 
> rdd1.map(x => rdd2.values.count() * x) is invalid because the values 
> transformation and count action cannot be performed inside of the rdd1.map 
> transformation. For more information, see SPARK-5063.
>       at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> {quote}
> This obviously implies invalid state for checkpointing data. This can be 
> reproduced easily with the following code skeleton (Kafka direct stream is 
> used):
> {quote}
> val dataStream = myKafkaDirectStream.
>   .mapWithState(stateSpec).stateSnapshots().foreachRDD { data =>
>   // back up the state
>   data.cache()
>   data.collect().foreach(println)
>   data.map { x => x._1 + "," + x._2 }.saveAsTextFile(inputPath)
> ..
> }
> {quote}
> inputPath above is both the initialRdd and the checkpointing dir (using a 
> different path does not affect the issue).
> Streaming context is created correctly with getOrCreate and all 
> transformations and actions are put in 
> that function.
> In order to reproduce you just run the job (1st run), stop it with kill -15 
> ... and then restart it (2nd run).
> To reproduce the issue you can use an empty local folder for the "inputPath" 
> the checkpointing path.
> The checkpointing blocks from the first run:
> {quote}
> 16/07/11 13:59:51 DEBUG DirectKafkaInputDStream: Updated checkpoint data for 
> time 1468234791000 ms: [
> 4 checkpoint files 
> 1468234791000 ms -> [Lscala.Tuple4;@60a55c45
> 1468234790000 ms -> [Lscala.Tuple4;@58e5be3
> 1468234789000 ms -> [Lscala.Tuple4;@13cf6be7
> 1468234788000 ms -> [Lscala.Tuple4;@6017d6ae
> ]
> 16/07/11 13:59:51 DEBUG InternalMapWithStateDStream: Updated checkpoint data 
> for time 1468234791000 ms: [
> 0 checkpoint files 
> ]
> 16/07/11 13:59:51 DEBUG FlatMappedDStream: Updated checkpoint data for time 
> 1468234791000 ms: [
> 0 checkpoint files 
> ]
> 16/07/11 13:59:51 DEBUG ForEachDStream: Updated checkpoint data for time 
> 1468234791000 ms: [
> 0 checkpoint files 
> ]
> {quote}
> In the first run i successfully see: 
> {quote}
> 16/07/11 13:59:49 INFO StreamingContext: Invoking stop(stopGracefully=true) 
> from shutdown hook
> {quote}
> Here is the log output from the second run, when the recovery occurs from the 
> checkpointing data:
> {quote}
> 16/07/11 14:00:02 DEBUG FileBasedWriteAheadLogReader: Error reading next 
> item, EOF reached
> java.io.EOFException
>       at java.io.DataInputStream.readInt(DataInputStream.java:392)
>       at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47)
>       at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>       at 
> scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:432)
>       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:212)
>       at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:210)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.recoverPastEvents(ReceivedBlockTracker.scala:210)
>       at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.<init>(ReceivedBlockTracker.scala:81)
>       at 
> org.apache.spark.streaming.scheduler.ReceiverTracker.<init>(ReceiverTracker.scala:106)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:80)
>       at 
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:610)
>       at 
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
>       at 
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
>       at org.apache.spark.util.ThreadUtils$$anon$1.run(ThreadUtils.scala:122)
> 16/07/11 14:00:02 INFO JobGenerator: Batches during down time (12 batches): 
> 1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 
> 1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, 
> 1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
> 16/07/11 14:00:02 INFO JobGenerator: Batches pending processing (0 batches): 
> 16/07/11 14:00:02 INFO JobGenerator: Batches to reschedule (12 batches): 
> 1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 
> 1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, 
> 1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
> 16/07/11 14:00:02 DEBUG DStreamGraph: Generating jobs for time 1468234791000 
> ms
> 16/07/11 14:00:02 DEBUG FlatMappedDStream: Time 1468234791000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234791000 ms is 
> valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234790000 ms is 
> valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234789000 ms is 
> valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234788000 ms is 
> valid
> 16/07/11 14:00:02 INFO InternalMapWithStateDStream: Time 1468234787000 ms is 
> invalid as zeroTime is 1468234787000 ms and slideDuration is 1000 ms and 
> difference is 0 ms
> 16/07/11 14:00:02 ERROR StreamingContext: Error starting the context, marking 
> it as stopped
> org.apache.spark.SparkException: RDD transformations and actions can only be 
> invoked by the driver, not inside of other transformations; for example, 
> rdd1.map(x => rdd2.values.count() * x) is invalid because the values 
> transformation and count action cannot be performed inside of the rdd1.map 
> transformation. For more information, see SPARK-5063.
>       at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>       at 
> org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:530)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:189)
> {quote}
> Maybe the first exception in the 2nd run relates to:
> https://mail-archives.apache.org/mod_mbox/spark-user/201509.mbox/%3CCAMz94CGJzCv6yHW1mOTU2QRX=Pdu2k_PCvJ0++JPNM=upfg...@mail.gmail.com%3E
> and causes the issue (not sure what is happening there).
> The issue was reported to happen also when using HDFS.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to