[
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mridul Muralidharan reassigned SPARK-41497:
-------------------------------------------
Assignee: Tengfei Huang
> Accumulator undercounting in the case of retry task with rdd cache
> ------------------------------------------------------------------
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
> Reporter: wuyi
> Assignee: Tengfei Huang
> Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache. See
> the example below and you could also find the completed and reproducible
> example at
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>
> {code:scala}
> test("SPARK-XXX") {
> // Set up a cluster with 2 executors
> val conf = new SparkConf()
> .setMaster("local-cluster[2, 1,
> 1024]").setAppName("TaskSchedulerImplSuite")
> sc = new SparkContext(conf)
> // Set up a custom task scheduler. The scheduler will fail the first task
> attempt of the job
> // submitted below. In particular, the failed first attempt task would
> success on computation
> // (accumulator accounting, result caching) but only fail to report its
> success status due
> // to the concurrent executor lost. The second task attempt would success.
> taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
> val myAcc = sc.longAccumulator("myAcc")
> // Initiate a rdd with only one partition so there's only one task and
> specify the storage level
> // with MEMORY_ONLY_2 so that the rdd result will be cached on both two
> executors.
> val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
> }.persist(StorageLevel.MEMORY_ONLY_2)
> // This will pass since the second task attempt will succeed
> assert(rdd.count() === 10)
> // This will fail due to `myAcc.add(100)` won't be executed during the
> second task attempt's
> // execution. Because the second task attempt will load the rdd cache
> directly instead of
> // executing the task function so `myAcc.add(100)` is skipped.
> assert(myAcc.value === 100)
> } {code}
>
> We could also hit this issue with decommission even if the rdd only has one
> copy. For example, decommission could migrate the rdd cache block to another
> executor (the result is actually the same with 2 copies) and the
> decommissioned executor lost before the task reports its success status to
> the driver.
>
> And the issue is a bit more complicated than expected to fix. I have tried to
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice,
> this option can already fix the issue in most cases. However, theoretically,
> rdd cache could be reported to the driver right after the driver cleans up
> the failed task's caches due to asynchronous communication. So this option
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same
> task: this option can 100% fix the issue. The problem is this way can also
> affect the case where rdd cache can be reused across the attempts (e.g., when
> there is no accumulator operation in the task), which can have perf
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework
> for supporting accumulator cache; second, the driver should improve its logic
> to distinguish whether the accumulator cache value should be reported to the
> user to avoid overcounting. For example, in the case of task retry, the value
> should be reported. However, in the case of rdd cache reuse, the value
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd
> cache: this way defines a rdd cache is only valid/accessible if the task has
> succeeded. This way could be either overkill or a bit complex (because
> currently Spark would clean up the task state once it’s finished. So we need
> to maintain a structure to know if task once succeeded or not. )
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]