[
https://issues.apache.org/jira/browse/SPARK-34779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Attila Zsolt Piros reassigned SPARK-34779:
------------------------------------------
Assignee: Baohe Zhang
> ExecutorMetricsPoller should keep stage entry in stageTCMP until a heartbeat
> occurs
> -----------------------------------------------------------------------------------
>
> Key: SPARK-34779
> URL: https://issues.apache.org/jira/browse/SPARK-34779
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1
> Reporter: Baohe Zhang
> Assignee: Baohe Zhang
> Priority: Major
>
> The current implementation of ExecutoMetricsPoller uses task count in each
> stage to decide whether to keep a stage entry or not. In the case of the
> executor only has 1 core, it may have these issues:
> # Peak metrics missing (due to stage entry being removed within a heartbeat
> interval)
> # Unnecessary and frequent hashmap entry removal and insertion.
> Assuming an executor with 1 core has 2 tasks (task1 and task2, both belong to
> stage (0,0)) to execute in a heartbeat interval, the workflow in current
> ExecutorMetricsPoller implementation would be:
> 1. task1 start -> stage (0, 0) entry created in stageTCMP, task count
> increment to1
> 2. 1st poll() -> update peak metrics of stage (0, 0)
> 3. task1 end -> stage (0, 0) task count decrement to 0, stage (0, 0) entry
> removed, peak metrics lost.
> 4. task2 start -> stage (0, 0) entry created in stageTCMP, task count
> increment to1
> 5. 2nd poll() -> update peak metrics of stage (0, 0)
> 6. task2 end -> stage (0, 0) task count decrement to 0, stage (0, 0) entry
> removed, peak metrics lost
> 7. heartbeat() -> empty or inaccurate peak metrics for stage(0,0) reported.
> We can fix the issue by keeping entries with task count = 0 in stageTCMP map
> until a heartbeat occurs. At the heartbeat, after reporting the peak metrics
> for each stage, we scan each stage in stageTCMP and remove entries with task
> count = 0.
> After the fix, the workflow would be:
> 1. task1 start -> stage (0, 0) entry created in stageTCMP, task count
> increment to1
> 2. 1st poll() -> update peak metrics of stage (0, 0)
> 3. task1 end -> stage (0, 0) task count decrement to 0,but the entry (0,0)
> still remain.
> 4. task2 start -> task count of stage (0,0) increment to1
> 5. 2nd poll() -> update peak metrics of stage (0, 0)
> 6. task2 end -> stage (0, 0) task count decrement to 0,but the entry (0,0)
> still remain.
> 7. heartbeat() -> accurate peak metrics for stage (0, 0) reported. Remove
> entry for stage (0,0) in stageTCMP because its task count is 0.
>
> How to verify the behavior?
> Submit a job with a custom polling interval (e.g., 2s) and
> spark.executor.cores=1 and check the debug logs of ExecutoMetricsPoller.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]