[ 
https://issues.apache.org/jira/browse/SPARK-11749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated SPARK-11749:
---------------------------------
    Affects Version/s:     (was: 1.5.0)
                       1.6.0

> Duplicate creating the RDD in file stream when recovering from checkpoint data
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-11749
>                 URL: https://issues.apache.org/jira/browse/SPARK-11749
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.5.2, 1.6.0
>            Reporter: Jack Hu
>            Assignee: Jack Hu
>             Fix For: 1.6.0
>
>
> I have a case to monitor a HDFS folder, then enrich the incoming data from 
> the HDFS folder via different table (about 15 reference tables) and send to 
> different hive table after some operations. 
> The code is as this:
> {code}
> val txt = 
> ssc.textFileStream(folder).map(toKeyValuePair).reduceByKey(removeDuplicates)
> val refTable1 = 
> ssc.textFileStream(refSource1).map(parse(_)).updateStateByKey(...)
> txt.join(refTable1).map(..).reduceByKey(...).foreachRDD(
>   rdd => {
>      // insert into hive table
>   }
> )
> val refTable2 = 
> ssc.textFileStream(refSource2).map(parse(_)).updateStateByKey(...)
> txt.join(refTable2).map(..).reduceByKey(...).foreachRDD(
>   rdd => {
>      // insert into hive table
>   }
> )
> /// more refTables in following code
> {code}
>  
> The {{batchInterval}} of this application is set to *30 seconds*, the 
> checkpoint interval is set to *10 minutes*, every batch in {{txt}} has *60 
> files*
> After recovered from checkpoint data, I can see lots of log to create the RDD 
> in file stream: rdd in each batch of file stream was been recreated *15 
> times*, and it takes about *5 minutes* to create so much file RDD. During 
> this period, *10K+ broadcast* had been created and almost used all the block 
> manager space. 
> After some investigation, we found that the {{DStream.restoreCheckpointData}} 
> would be invoked at each output ({{DStream.foreachRDD}} in this case), and no 
> flag to indicate that this {{DStream}} had been restored, so the RDD in file 
> stream was been recreated. 
> Suggest to add on flag to control the restore process to avoid the duplicated 
> work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to