Accumulators are generally unreliable and should not be used. The answer to
(2) and (4) is yes. The answer to (3) is both.

Here's a more in-depth explanation:
http://imranrashid.com/posts/Spark-Accumulators/

On Sun, Dec 11, 2016 at 11:27 AM, Sudev A C <sudev...@goibibo.com> wrote:

> Please help.
> Anyone, any thoughts on the previous mail ?
>
> Thanks
> Sudev
>
>
> On Fri, Dec 9, 2016 at 2:28 PM Sudev A C <sudev...@goibibo.com> wrote:
>
>> Hi,
>>
>> Can anyone please help clarity on how accumulators can be used reliably
>> to measure error/success/analytical metrics ?
>>
>> Given below is use case / code snippet that I have.
>>
>> val amtZero = sc.accumulator(0)
>> val amtLarge = sc.accumulator(0)
>> val amtNormal = sc.accumulator(0)
>> val getAmount = (x: org.apache.spark.sql.Row) => if (x.isNullAt(somePos))
>> {
>>   amtZero.add(1)
>>   0.0
>> } else {
>>   val amount = x.getDouble(4)
>>   if (amount > 10000) amtLarge.add(1) else amtNormal.add(1)
>>   amount
>> }
>> mrdd = rdd.map(s => (s, getAmount(s)))
>> mrdd.cache()
>> another_mrdd = rdd2.map(s => (s, getAmount(s)))
>> mrdd.save_to_redshift
>> another_mrdd.save_to_redshift
>> mrdd.union(another_mrdd).map().groupByKey().save_to_redshift
>>
>>
>>
>> // Get values from accumulators and persist it to a reliable store for
>> analytics.
>> save_to_datastore(amtZero.value, amtLarge.value, amtNormal.value)
>>
>>
>>
>> Few questions :
>>
>> 1. How many times should I expect the counts for items within mrdd and
>> another_mrdd since both of these rdd's are being reused ? What happens when
>> a part of DAG is skipped due to caching in between (say I'm caching
>> only mrdd)?
>>
>> 2. Should I be worried about any possible stage/task failures (due to
>> master-wroker network issues/resource-starvation/speculative-execution),
>> can these events lead to wrong counts ?
>>
>> 3. Document says  **In transformations, users should be aware of that
>> each task’s update may be applied more than once if tasks or job stages are
>> re-executed.**
>> Here re-execution of stages/tasks is referring to failure re-executions
>> or re-execution due to stage/tasks position in DAG ?
>>
>> 4. Is it safe to say that usage of accumulators(for exact counts) are
>> limited to .foreach() as actions guarantee exactly once updates ?
>>
>> Thanks
>> Sudev
>>
>>
>>
>>

Reply via email to