[ 
https://issues.apache.org/jira/browse/SPARK-2688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14291214#comment-14291214
 ] 

Imran Rashid commented on SPARK-2688:
-------------------------------------

[~airhorns]
I completely agree with your use case, of making your pipeline robust to a few 
bad records, but still being able to track those errors and look at them later. 
 I've had to deal with this a lot myself, and my solution has been to use 
accumulators as a way to keep track of small additional outputs.  Eg., I'd do 
something like:

{code}
val errors = sc.accumulableCollection(mutable.Set[String]())
val parsedDataRDD = rawLogsRdds.flatMap{rawLog =>
  try {
    val parse = MyParser.parse(rawLog)
    ...
    Some(result)
  } catch {
    case NonFatal(ex) =>
      errors += rawLog // or maybe you want to keep just the exception, or both
      None
  }
}
{code}

Note that accumulators don't really scale very well, though.  If you expect a 
lot of errors, you might just want to keep a sample of them and a count of the 
total number.  Then after the next action, you can look at the errors & the 
total count, and perhaps even fail if the number of errors is more than you can 
tolerate.

> Need a way to run multiple data pipeline concurrently
> -----------------------------------------------------
>
>                 Key: SPARK-2688
>                 URL: https://issues.apache.org/jira/browse/SPARK-2688
>             Project: Spark
>          Issue Type: New Feature
>          Components: Spark Core
>    Affects Versions: 1.0.1
>            Reporter: Xuefu Zhang
>
> Suppose we want to do the following data processing: 
> {code}
> rdd1 -> rdd2 -> rdd3
>            | -> rdd4
>            | -> rdd5
>            \ -> rdd6
> {code}
> where -> represents a transformation. rdd3 to rrdd6 are all derived from an 
> intermediate rdd2. We use foreach(fn) with a dummy function to trigger the 
> execution. However, rdd.foreach(fn) only trigger pipeline rdd1 -> rdd2 -> 
> rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be 
> recomputed. This is very inefficient. Ideally, we should be able to trigger 
> the execution the whole graph and reuse rdd2, but there doesn't seem to be a 
> way doing so. Tez already realized the importance of this (TEZ-391), so I 
> think Spark should provide this too.
> This is required for Hive to support multi-insert queries. HIVE-7292.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to