[
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14493749#comment-14493749
]
Sean Owen commented on SPARK-6847:
----------------------------------
Yeah, doesn't quite help since it is not clear where it starts, but that top
may be lost.
Given the observations, the problem may be putting all of the input data into
one key, effectively making all RDDs one record, then checkpointing that
infrequently, which means it goes to serialize a large object. "Large" isn't
the problem but whatever it is seems to have a long object dependency graph,
maybe a linked list of blocks for example. This would explain why no
checkpointing or smaller intervals, could be the difference. How about also
turning down the checkpoint interval?
It shouldn't occur ideally but this might be pushing the intended usage a bit
far by having as skewed a data distribution as possible. Does this come up in
real usage? You'd generally expect the data per key per interval to be smallish.
> Stack overflow on updateStateByKey which followed by a dstream with
> checkpoint set
> ----------------------------------------------------------------------------------
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.3.0
> Reporter: Jack Hu
> Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}}
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", 9999)
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) =>
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
> println("Deep: " + rdd.toDebugString.split("\n").length)
> println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over
> time, the {{updateStateByKey}} never get check-pointed, and finally, the
> stack overflow will happen.
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but
> not the {{updateStateByKey}}
> * If remove the {{checkpoint(Seconds(10))}} from the map result (
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]