[
https://issues.apache.org/jira/browse/SPARK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830758#comment-16830758
]
Stavros Kontopoulos edited comment on SPARK-27598 at 4/30/19 11:39 PM:
-----------------------------------------------------------------------
Btw if I do the trick and put the mappingFunction in an object like this with
Spark 2.3.3 on restart I get:
{quote}def createContext(checkpointDirectory: String, inputDirectory: String,
outputDirectory: String)
: StreamingContext = {
...
object T extends Serializable {
// Update the cumulative count using mapWithState
// This will give a DStream made of state (which is the cumulative count of
the words)
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) =>
Unknown macro: \{ val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val
output = (word, sum) state.update(sum) output }
}
val stateDstream = words.mapWithState(
StateSpec.function(T.mappingFunc).initialState(initialRDD))
}
{quote}
{quote}2019-05-01 02:36:14 WARN BatchedWriteAheadLog:66 - BatchedWriteAheadLog
Writer queue interrupted.
org.apache.spark.SparkException: This RDD lacks a SparkContext. It could
happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside
of other transformations; for example, rdd1.map(x => rdd2.values.count() * x)
is invalid because the values transformation and count action cannot be
performed inside of the rdd1.map transformation. For more information, see
SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will
be hit if a reference to an RDD not defined by the streaming job is used in
DStream operations. For more information, See SPARK-13758.
at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:90)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at
org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:529)
at
org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:193)
at
org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:146)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
{quote}
was (Author: skonto):
Btw if I do the trick and put the mappingFunction in an object like this with
Spark 2.3.3 on restart I get:
{quote}
def createContext(checkpointDirectory: String, inputDirectory: String,
outputDirectory: String)
: StreamingContext = {
...
object T extends Serializable {
// Update the cumulative count using mapWithState
// This will give a DStream made of state (which is the cumulative count of
the words)
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
}
{quote}
}
{quote}2019-05-01 02:36:14 WARN BatchedWriteAheadLog:66 - BatchedWriteAheadLog
Writer queue interrupted.
org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen
in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside
of other transformations; for example, rdd1.map(x => rdd2.values.count() * x)
is invalid because the values transformation and count action cannot be
performed inside of the rdd1.map transformation. For more information, see
SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be
hit if a reference to an RDD not defined by the streaming job is used in
DStream operations. For more information, See SPARK-13758.
at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:90)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at
org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:529)
at
org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:193)
at
org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:146)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
{quote}
> DStreams checkpointing does not work with the Spark Shell
> ---------------------------------------------------------
>
> Key: SPARK-27598
> URL: https://issues.apache.org/jira/browse/SPARK-27598
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.4.0, 2.4.1, 2.4.2, 3.0.0
> Reporter: Stavros Kontopoulos
> Priority: Major
>
> When I restarted a stream with checkpointing enabled I got this:
> {quote}19/04/29 22:45:06 WARN CheckpointReader: Error reading checkpoint from
> file
> [file:/tmp/checkpoint/checkpoint-1556566950000.bk|file:///tmp/checkpoint/checkpoint-1556566950000.bk]
> java.io.IOException: java.lang.ClassCastException: cannot assign instance of
> java.lang.invoke.SerializedLambda to field
> org.apache.spark.streaming.dstream.FileInputDStream.filter of type
> scala.Function1 in instance of
> org.apache.spark.streaming.dstream.FileInputDStream
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1322)
> at
> org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:314)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {quote}
> It seems that the closure is stored in the Serialized format and cannot be
> assigned back to a scala function1
> Details of how to reproduce it here:
> [https://gist.github.com/skonto/87d5b2368b0bf7786d9dd992a710e4e6]
> Maybe this is spark-shell specific and is not expected to work anyway, as I
> dont see this to be an issues with a normal jar.
> Note that with Spark 2.3.3 the error is different and this still does not
> work but with a different error.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]