[ 
https://issues.apache.org/jira/browse/SPARK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830758#comment-16830758
 ] 

Stavros Kontopoulos edited comment on SPARK-27598 at 4/30/19 11:39 PM:
-----------------------------------------------------------------------

Btw if I do the trick and put the mappingFunction in an object like this with 
Spark 2.3.3 on restart I get:
{quote}def createContext(checkpointDirectory: String, inputDirectory: String, 
outputDirectory: String)
 : StreamingContext = {

...

object T extends Serializable {
 // Update the cumulative count using mapWithState
 // This will give a DStream made of state (which is the cumulative count of 
the words)
 val mappingFunc = (word: String, one: Option[Int], state: State[Int]) =>
Unknown macro: \{ val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val 
output = (word, sum) state.update(sum) output }
}

val stateDstream = words.mapWithState(
 StateSpec.function(T.mappingFunc).initialState(initialRDD))

}
{quote}
{quote}2019-05-01 02:36:14 WARN BatchedWriteAheadLog:66 - BatchedWriteAheadLog 
Writer queue interrupted.
 org.apache.spark.SparkException: This RDD lacks a SparkContext. It could 
happen in the following cases:
 (1) RDD transformations and actions are NOT invoked by the driver, but inside 
of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) 
is invalid because the values transformation and count action cannot be 
performed inside of the rdd1.map transformation. For more information, see 
SPARK-5063.
 (2) When a Spark Streaming job recovers from checkpoint, this exception will 
be hit if a reference to an RDD not defined by the streaming job is used in 
DStream operations. For more information, See SPARK-13758.
 at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:90)
 at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
 at 
org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:529)
 at 
org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:193)
 at 
org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:146)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
{quote}


was (Author: skonto):
Btw if I do the trick and put the mappingFunction in an object like this with 
Spark 2.3.3 on restart I get:
{quote}
 def createContext(checkpointDirectory: String, inputDirectory: String, 
outputDirectory: String)
 : StreamingContext = {

...

object T extends Serializable {
 // Update the cumulative count using mapWithState
 // This will give a DStream made of state (which is the cumulative count of 
the words)
 val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
 val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
 val output = (word, sum)
 state.update(sum)
 output
 }
 }
{quote}
}
{quote}2019-05-01 02:36:14 WARN BatchedWriteAheadLog:66 - BatchedWriteAheadLog 
Writer queue interrupted.
org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen 
in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside 
of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) 
is invalid because the values transformation and count action cannot be 
performed inside of the rdd1.map transformation. For more information, see 
SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be 
hit if a reference to an RDD not defined by the streaming job is used in 
DStream operations. For more information, See SPARK-13758.
 at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:90)
 at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
 at 
org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:529)
 at 
org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:193)
 at 
org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:146)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
{quote}

> DStreams checkpointing does not work with the Spark Shell
> ---------------------------------------------------------
>
>                 Key: SPARK-27598
>                 URL: https://issues.apache.org/jira/browse/SPARK-27598
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.4.0, 2.4.1, 2.4.2, 3.0.0
>            Reporter: Stavros Kontopoulos
>            Priority: Major
>
> When I restarted a stream with checkpointing enabled I got this:
> {quote}19/04/29 22:45:06 WARN CheckpointReader: Error reading checkpoint from 
> file 
> [file:/tmp/checkpoint/checkpoint-1556566950000.bk|file:///tmp/checkpoint/checkpoint-1556566950000.bk]
>  java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.spark.streaming.dstream.FileInputDStream.filter of type 
> scala.Function1 in instance of 
> org.apache.spark.streaming.dstream.FileInputDStream
>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1322)
>  at 
> org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:314)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {quote}
> It seems that the closure is stored in the Serialized format and cannot be 
> assigned back to a scala function1
> Details of how to reproduce it here: 
> [https://gist.github.com/skonto/87d5b2368b0bf7786d9dd992a710e4e6]
> Maybe this is spark-shell specific and is not expected to work anyway, as I 
> dont see this to be an issues with a normal jar. 
> Note that with Spark 2.3.3 the error is different and this still does not 
> work but with a different error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to