This is an automated email from the ASF dual-hosted git repository. zuston pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push: new 6ad3aa0f3 [#2618] fix(spark): Invalid reassign status show in spark UI tab (#2620) 6ad3aa0f3 is described below commit 6ad3aa0f34c7d58ad3bf04697d9c738b0ec4df71 Author: Junfan Zhang <zus...@apache.org> AuthorDate: Mon Sep 22 16:56:36 2025 +0800 [#2618] fix(spark): Invalid reassign status show in spark UI tab (#2620) ### What changes were proposed in this pull request? Fix the invalid reassign status show in spark UI tab ### Why are the changes needed? In the original design, the reassign info event is sent in the final stop method. However, because the post operation runs asynchronously, the listener may not receive the event before the JVM shuts down, so the event is effectively dropped. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Internal spark job test --- .../shuffle/manager/RssShuffleManagerBase.java | 31 +++++++++++----------- .../scala/org/apache/spark/ui/ShufflePage.scala | 16 ++--------- 2 files changed, 17 insertions(+), 30 deletions(-) diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java index e9994aa35..45615898e 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java @@ -1024,7 +1024,7 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac "The stage retry has been triggered successfully for the shuffleId: {}, attemptNumber: {}", shuffleId, stageAttemptNumber); - this.reassignTriggeredOnStageRetry.set(true); + postReassignTriggeredEvent(reassignTriggeredOnStageRetry); return true; } @@ -1148,15 +1148,24 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac System.currentTimeMillis() - startTime, partitionSplit, reassignResult); - if (partitionSplit) { - this.reassignTriggeredOnPartitionSplit.set(true); - } else { - this.reassignTriggeredOnBlockSendFailure.set(true); - } + postReassignTriggeredEvent( + partitionSplit ? reassignTriggeredOnPartitionSplit : reassignTriggeredOnBlockSendFailure); return internalHandle; } } + /** This method will check the historical state to avoid posting duplicate events */ + private void postReassignTriggeredEvent(AtomicBoolean isReassign) { + if (isReassign.compareAndSet(false, true)) { + TaskReassignInfoEvent reassignInfoEvent = + new TaskReassignInfoEvent( + reassignTriggeredOnPartitionSplit.get(), + reassignTriggeredOnBlockSendFailure.get(), + reassignTriggeredOnStageRetry.get()); + RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(reassignInfoEvent); + } + } + private Set<ShuffleServerInfo> requestReassignServer( int stageId, int stageAttemptNumber, @@ -1195,16 +1204,6 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac @Override public void stop() { - if (this.isDriver && partitionReassignEnabled) { - // send reassign event into spark event store - TaskReassignInfoEvent reassignInfoEvent = - new TaskReassignInfoEvent( - reassignTriggeredOnPartitionSplit.get(), - reassignTriggeredOnBlockSendFailure.get(), - reassignTriggeredOnStageRetry.get()); - RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(reassignInfoEvent); - } - if (managerClientSupplier != null && managerClientSupplier instanceof ExpiringCloseableSupplier) { ((ExpiringCloseableSupplier<ShuffleManagerClient>) managerClientSupplier).close(); diff --git a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala index 3fadde1e9..8916916a3 100644 --- a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala +++ b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala @@ -334,21 +334,9 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") with Logging { </li> <li> <a> - <strong>ReassignTriggeredOnPartitionSplit: </strong> + <strong>Reassign Status:</strong> </a> - {reassignInfo.isReassignTriggeredOnPartitionSplit} - </li> - <li> - <a> - <strong>ReassignTriggeredOnBlockSendFailure: </strong> - </a> - {reassignInfo.isReassignTriggeredOnBlockSendFailure} - </li> - <li> - <a> - <strong>ReassignTriggeredOnStageRetry: </strong> - </a> - {reassignInfo.isReassignTriggeredOnStageRetry} + partitionSplit={reassignInfo.isReassignTriggeredOnPartitionSplit}, blockSentFailure={reassignInfo.isReassignTriggeredOnBlockSendFailure}, stageRetry={reassignInfo.isReassignTriggeredOnStageRetry} </li> </ul> </div>