[ 
https://issues.apache.org/jira/browse/SPARK-41792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-41792:
----------------------------------------
    Affects Version/s:     (was: 3.3.0)

> Shuffle merge finalization removes the wrong finalization state from the DB
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-41792
>                 URL: https://issues.apache.org/jira/browse/SPARK-41792
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle
>    Affects Versions: 3.4.0
>            Reporter: Mridul Muralidharan
>            Priority: Minor
>
> During `finalizeShuffleMerge` in external shuffle service, if the 
> finalization request is for a newer shuffle merge id, then we cleanup the 
> existing (older) shuffle details and add the newer entry (for which we got no 
> pushed blocks) to the DB.
> Unfortunately, when cleaning up from the DB, we are using the incorrect 
> AppAttemptShuffleMergeId - we remove the latest shuffle merge id instead of 
> the existing entry.
> Proposed Fix:
> {code}
> diff --git 
> a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
>  
> b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
> index 816d1082850..551104d0eba 100644
> --- 
> a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
> +++ 
> b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
> @@ -653,9 +653,11 @@ public class RemoteBlockPushResolver implements 
> MergedShuffleFileManager {
>          } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
>            // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId 
> then return
>            // empty MergeStatuses but cleanup the older shuffleMergeId files.
> +          AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = new 
> AppAttemptShuffleMergeId(
> +                  msg.appId, msg.appAttemptId, msg.shuffleId, 
> mergePartitionsInfo.shuffleMergeId);
>            submitCleanupTask(() ->
>                closeAndDeleteOutdatedPartitions(
> -                  appAttemptShuffleMergeId, 
> mergePartitionsInfo.shuffleMergePartitions));
> +                  currentAppAttemptShuffleMergeId, 
> mergePartitionsInfo.shuffleMergePartitions));
>          } else {
>            // This block covers:
>            //  1. finalization of determinate stage
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to