Accumulators are generally unreliable and should not be used. The answer to (2) and (4) is yes. The answer to (3) is both.
Here's a more in-depth explanation: http://imranrashid.com/posts/Spark-Accumulators/ On Sun, Dec 11, 2016 at 11:27 AM, Sudev A C <sudev...@goibibo.com> wrote: > Please help. > Anyone, any thoughts on the previous mail ? > > Thanks > Sudev > > > On Fri, Dec 9, 2016 at 2:28 PM Sudev A C <sudev...@goibibo.com> wrote: > >> Hi, >> >> Can anyone please help clarity on how accumulators can be used reliably >> to measure error/success/analytical metrics ? >> >> Given below is use case / code snippet that I have. >> >> val amtZero = sc.accumulator(0) >> val amtLarge = sc.accumulator(0) >> val amtNormal = sc.accumulator(0) >> val getAmount = (x: org.apache.spark.sql.Row) => if (x.isNullAt(somePos)) >> { >> amtZero.add(1) >> 0.0 >> } else { >> val amount = x.getDouble(4) >> if (amount > 10000) amtLarge.add(1) else amtNormal.add(1) >> amount >> } >> mrdd = rdd.map(s => (s, getAmount(s))) >> mrdd.cache() >> another_mrdd = rdd2.map(s => (s, getAmount(s))) >> mrdd.save_to_redshift >> another_mrdd.save_to_redshift >> mrdd.union(another_mrdd).map().groupByKey().save_to_redshift >> >> >> >> // Get values from accumulators and persist it to a reliable store for >> analytics. >> save_to_datastore(amtZero.value, amtLarge.value, amtNormal.value) >> >> >> >> Few questions : >> >> 1. How many times should I expect the counts for items within mrdd and >> another_mrdd since both of these rdd's are being reused ? What happens when >> a part of DAG is skipped due to caching in between (say I'm caching >> only mrdd)? >> >> 2. Should I be worried about any possible stage/task failures (due to >> master-wroker network issues/resource-starvation/speculative-execution), >> can these events lead to wrong counts ? >> >> 3. Document says **In transformations, users should be aware of that >> each task’s update may be applied more than once if tasks or job stages are >> re-executed.** >> Here re-execution of stages/tasks is referring to failure re-executions >> or re-execution due to stage/tasks position in DAG ? >> >> 4. Is it safe to say that usage of accumulators(for exact counts) are >> limited to .foreach() as actions guarantee exactly once updates ? >> >> Thanks >> Sudev >> >> >> >>