[
https://issues.apache.org/jira/browse/SPARK-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15150893#comment-15150893
]
krishna ramachandran commented on SPARK-13349:
----------------------------------------------
i have simple synthetic example below. created 2 "raw streams" and job 1 is
materialized when stream1 is output (some action print/save)
In job1
val stream1 = ssc.union(rawStreams).filter(_.contains("Stream:first"))
save.stream1
.......
......
job2 create another split using rawStreams and union with stream1
val stream2 = ssc.union(rawStreams).filter(_.contains("Batch:second"))
val stream3 = stream1.union(stream2)
......
save.stream3
job2 is materialized and executed. This pattern is executed for every batch
Looking at visual DAG I see, job1 executes first graph and job2 computes both
"stream1" and "stream2"
Caching DStream stream1 (result from job1) makes job2 go almost twice as fast
In our real app, we have 7 such jobs per batch and typically we union output of
job5 with job1. That is, union output of 1 with stream generated during job5.
Caching and reusing output of job1 (stream1) is very efficient (per batch
execution is 2.5 times faster) - but we start seeing out of memory errors
I would like to be able to "unpersist" stream1 after the union (for that batch)
> adding a split and union to a streaming application cause big performance hit
> -----------------------------------------------------------------------------
>
> Key: SPARK-13349
> URL: https://issues.apache.org/jira/browse/SPARK-13349
> Project: Spark
> Issue Type: Improvement
> Affects Versions: 1.4.1
> Reporter: krishna ramachandran
> Priority: Critical
> Fix For: 1.4.2
>
>
> We have a streaming application containing approximately 12 jobs every batch,
> running in streaming mode (4 sec batches). Each job writes output to cassandra
> each job can contain several stages.
> job 1
> ---> receive Stream A --> map --> filter -> (union with another stream B) -->
> map --> groupbykey --> transform --> reducebykey --> map
> we go thro' few more jobs of transforms and save to database.
> Around stage 5, we union the output of Dstream from job 1 (in red) with
> another stream (generated by split during job 2) and save that state
> It appears the whole execution thus far is repeated which is redundant (I can
> see this in execution graph & also performance -> processing time).
> Processing time per batch nearly doubles or triples.
> This additional & redundant processing cause each batch to run as much as 2.5
> times slower compared to runs without the union - union for most batches does
> not alter the original DStream (union with an empty set). If I cache the
> DStream from job 1(red block output), performance improves substantially but
> hit out of memory errors within few hours.
> What is the recommended way to cache/unpersist in such a scenario? there is
> no dstream level "unpersist"
> setting "spark.streaming.unpersist" to true and
> streamingContext.remember("duration") did not help. Still seeing out of
> memory errors
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]