This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ad3aa0f3 [#2618] fix(spark): Invalid reassign status show in spark UI 
tab (#2620)
6ad3aa0f3 is described below

commit 6ad3aa0f34c7d58ad3bf04697d9c738b0ec4df71
Author: Junfan Zhang <zus...@apache.org>
AuthorDate: Mon Sep 22 16:56:36 2025 +0800

    [#2618] fix(spark): Invalid reassign status show in spark UI tab (#2620)
    
    ### What changes were proposed in this pull request?
    
    Fix the invalid reassign status show in spark UI tab
    
    ### Why are the changes needed?
    
    In the original design, the reassign info event is sent in the final stop 
method. However, because the post operation runs asynchronously, the listener 
may not receive the event before the JVM shuts down, so the event is 
effectively dropped.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Internal spark job test
---
 .../shuffle/manager/RssShuffleManagerBase.java     | 31 +++++++++++-----------
 .../scala/org/apache/spark/ui/ShufflePage.scala    | 16 ++---------
 2 files changed, 17 insertions(+), 30 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index e9994aa35..45615898e 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -1024,7 +1024,7 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
         "The stage retry has been triggered successfully for the shuffleId: 
{}, attemptNumber: {}",
         shuffleId,
         stageAttemptNumber);
-    this.reassignTriggeredOnStageRetry.set(true);
+    postReassignTriggeredEvent(reassignTriggeredOnStageRetry);
     return true;
   }
 
@@ -1148,15 +1148,24 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
           System.currentTimeMillis() - startTime,
           partitionSplit,
           reassignResult);
-      if (partitionSplit) {
-        this.reassignTriggeredOnPartitionSplit.set(true);
-      } else {
-        this.reassignTriggeredOnBlockSendFailure.set(true);
-      }
+      postReassignTriggeredEvent(
+          partitionSplit ? reassignTriggeredOnPartitionSplit : 
reassignTriggeredOnBlockSendFailure);
       return internalHandle;
     }
   }
 
+  /** This method will check the historical state to avoid posting duplicate 
events */
+  private void postReassignTriggeredEvent(AtomicBoolean isReassign) {
+    if (isReassign.compareAndSet(false, true)) {
+      TaskReassignInfoEvent reassignInfoEvent =
+          new TaskReassignInfoEvent(
+              reassignTriggeredOnPartitionSplit.get(),
+              reassignTriggeredOnBlockSendFailure.get(),
+              reassignTriggeredOnStageRetry.get());
+      
RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(reassignInfoEvent);
+    }
+  }
+
   private Set<ShuffleServerInfo> requestReassignServer(
       int stageId,
       int stageAttemptNumber,
@@ -1195,16 +1204,6 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
 
   @Override
   public void stop() {
-    if (this.isDriver && partitionReassignEnabled) {
-      // send reassign event into spark event store
-      TaskReassignInfoEvent reassignInfoEvent =
-          new TaskReassignInfoEvent(
-              reassignTriggeredOnPartitionSplit.get(),
-              reassignTriggeredOnBlockSendFailure.get(),
-              reassignTriggeredOnStageRetry.get());
-      
RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(reassignInfoEvent);
-    }
-
     if (managerClientSupplier != null
         && managerClientSupplier instanceof ExpiringCloseableSupplier) {
       ((ExpiringCloseableSupplier<ShuffleManagerClient>) 
managerClientSupplier).close();
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index 3fadde1e9..8916916a3 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -334,21 +334,9 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
             </li>
             <li>
               <a>
-                <strong>ReassignTriggeredOnPartitionSplit: </strong>
+                <strong>Reassign Status:</strong>
               </a>
-              {reassignInfo.isReassignTriggeredOnPartitionSplit}
-            </li>
-            <li>
-              <a>
-                <strong>ReassignTriggeredOnBlockSendFailure: </strong>
-              </a>
-              {reassignInfo.isReassignTriggeredOnBlockSendFailure}
-            </li>
-            <li>
-              <a>
-                <strong>ReassignTriggeredOnStageRetry: </strong>
-              </a>
-              {reassignInfo.isReassignTriggeredOnStageRetry}
+              
partitionSplit={reassignInfo.isReassignTriggeredOnPartitionSplit}, 
blockSentFailure={reassignInfo.isReassignTriggeredOnBlockSendFailure}, 
stageRetry={reassignInfo.isReassignTriggeredOnStageRetry}
             </li>
           </ul>
         </div>

Reply via email to