[ 
https://issues.apache.org/jira/browse/SPARK-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687668#comment-15687668
 ] 

Reynold Xin commented on SPARK-12469:
-------------------------------------

Sorry there is no way to get this in 2.1, given the size of the change.


> Data Property Accumulators for Spark (formerly Consistent Accumulators)
> -----------------------------------------------------------------------
>
>                 Key: SPARK-12469
>                 URL: https://issues.apache.org/jira/browse/SPARK-12469
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>            Reporter: holdenk
>
> Tasks executed on Spark workers are unable to modify values from the driver, 
> and accumulators are the one exception for this. Accumulators in Spark are 
> implemented in such a way that when a stage is recomputed (say for cache 
> eviction) the accumulator will be updated a second time. This makes 
> accumulators inside of transformations more difficult to use for things like 
> counting invalid records (one of the primary potential use cases of 
> collecting side information during a transformation). However in some cases 
> this counting during re-evaluation is exactly the behaviour we want (say in 
> tracking total execution time for a particular function). Spark would benefit 
> from a version of accumulators which did not double count even if stages were 
> re-executed.
> Motivating example:
> {code}
> val parseTime = sc.accumulator(0L)
> val parseFailures = sc.accumulator(0L)
> val parsedData = sc.textFile(...).flatMap { line =>
>   val start = System.currentTimeMillis()
>   val parsed = Try(parse(line))
>   if (parsed.isFailure) parseFailures += 1
>   parseTime += System.currentTimeMillis() - start
>   parsed.toOption
> }
> parsedData.cache()
> val resultA = parsedData.map(...).filter(...).count()
> // some intervening code.  Almost anything could happen here -- some of 
> parsedData may
> // get kicked out of the cache, or an executor where data was cached might 
> get lost
> val resultB = parsedData.filter(...).map(...).flatMap(...).count()
> // now we look at the accumulators
> {code}
> Here we would want parseFailures to only have been added to once for every 
> line which failed to parse.  Unfortunately, the current Spark accumulator API 
> doesn’t support the current parseFailures use case since if some data had 
> been evicted its possible that it will be double counted.
> See the full design document at 
> https://docs.google.com/document/d/1lR_l1g3zMVctZXrcVjFusq2iQVpr4XvRK_UUDsDr6nk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to