[
https://issues.apache.org/jira/browse/SPARK-48394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
wuyi updated SPARK-48394:
-------------------------
Description:
There is only one valid mapstatus for the same {{mapIndex}} at the same time in
Spark. {{mapIdToMapIndex}} should also follows the same rule to avoid chaos.
The issue leads to shuffle fetch failure and the job failure in the end. It
happens this way:
# Stage A computes the partition P0 by task t1 (TID, a.k.a mapId) on executor
e1
# Executor Y starts deommission
# Executor Y reports false-positve lost to driver during its decommission
# Stage B reuse the shuffle dependency with Stage A, and computes the
partition P0 again by task t2 on executor e2
# When task t2 finishes, we see two items ((t1 -> P0), (t2 -> P0)) for the
same paritition in {{mapIdToMapIndex}} but only one item
(mapStatuses(P0)=MapStatus(t2, e2)) in {{{}mapStatuses{}}}.
# Executor Y starts to migrate task t1's mapstatus (to executor e3 for
example) and call {{updateMapOutput}} on driver. Regarding to 5), we'd use
mapId (i.e., t1) to get mapIndex (i.e., P0) and use P0 to get task t2's
mapstatus.
// updateMapOutputval mapIndex = mapIdToMapIndex.get(mapId)val mapStatusOpt =
mapIndex.map(mapStatuses(_)).flatMap(Option(_))
# Task t2's mapstatus's location then would be updated to executor e3 but
it's indeed still located on executor e2. This finally leads to the fetch
failure in the end.
was:There is only one valid mapstatus for the same {{mapIndex}} at the same
time in Spark. {{mapIdToMapIndex}} should also follows the same rule to avoid
chaos.
> Cleanup mapIdToMapIndex on mapoutput unregister
> -----------------------------------------------
>
> Key: SPARK-48394
> URL: https://issues.apache.org/jira/browse/SPARK-48394
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.5.0, 4.0.0, 3.5.1
> Reporter: wuyi
> Assignee: wuyi
> Priority: Major
> Labels: pull-request-available
> Fix For: 4.0.0
>
>
> There is only one valid mapstatus for the same {{mapIndex}} at the same time
> in Spark. {{mapIdToMapIndex}} should also follows the same rule to avoid
> chaos.
>
> The issue leads to shuffle fetch failure and the job failure in the end. It
> happens this way:
> # Stage A computes the partition P0 by task t1 (TID, a.k.a mapId) on
> executor e1
> # Executor Y starts deommission
> # Executor Y reports false-positve lost to driver during its decommission
> # Stage B reuse the shuffle dependency with Stage A, and computes the
> partition P0 again by task t2 on executor e2
> # When task t2 finishes, we see two items ((t1 -> P0), (t2 -> P0)) for the
> same paritition in {{mapIdToMapIndex}} but only one item
> (mapStatuses(P0)=MapStatus(t2, e2)) in {{{}mapStatuses{}}}.
> # Executor Y starts to migrate task t1's mapstatus (to executor e3 for
> example) and call {{updateMapOutput}} on driver. Regarding to 5), we'd use
> mapId (i.e., t1) to get mapIndex (i.e., P0) and use P0 to get task t2's
> mapstatus.
> // updateMapOutputval mapIndex = mapIdToMapIndex.get(mapId)val mapStatusOpt =
> mapIndex.map(mapStatuses(_)).flatMap(Option(_))
> # Task t2's mapstatus's location then would be updated to executor e3 but
> it's indeed still located on executor e2. This finally leads to the fetch
> failure in the end.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]