This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 15ea5d366 [CELEBORN-1930][CIP-12] Support HARD_SPLIT in PushMergedData 
should handle congestion control NPE issue
15ea5d366 is described below

commit 15ea5d366470b691b2bea16164c0605b510a45d5
Author: Xianming Lei <[email protected]>
AuthorDate: Wed Mar 26 23:44:04 2025 -0700

    [CELEBORN-1930][CIP-12] Support HARD_SPLIT in PushMergedData should handle 
congestion control NPE issue
    
    ### What changes were proposed in this pull request?
    When hard split happens, some FileWriters may be null which causes handle 
congestion control NPE.
    
    ### Why are the changes needed?
    When hard split happens, some FileWriters may be null which causes handle 
congestion control NPE.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing uts.
    
    Closes #3176 from leixm/CELEBORN-1930.
    
    Authored-by: Xianming Lei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../service/deploy/worker/PushDataHandler.scala      | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 30a12535b..c8686fda1 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -675,9 +675,13 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
                   }
                   // Only primary data enable replication will push data to 
replica
                   Option(CongestionController.instance()) match {
-                    case Some(congestionController) if fileWriters.nonEmpty =>
-                      if (congestionController.isUserCongested(
-                          fileWriters.head.getUserCongestionControlContext)) {
+                    case Some(congestionController) =>
+                      val userCongested =
+                        fileWriters
+                          .find(_ != null)
+                          .map(_.getUserCongestionControlContext)
+                          .exists(congestionController.isUserCongested)
+                      if (userCongested) {
                         // Check whether primary congest the data though the 
replicas doesn't congest
                         // it(the response is empty)
                         pushMergedDataCallback.onSuccess(
@@ -780,9 +784,13 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
             index += 1
           }
           Option(CongestionController.instance()) match {
-            case Some(congestionController) if fileWriters.nonEmpty =>
-              if (congestionController.isUserCongested(
-                  fileWriters.head.getUserCongestionControlContext)) {
+            case Some(congestionController) =>
+              val userCongested =
+                fileWriters
+                  .find(_ != null)
+                  .map(_.getUserCongestionControlContext)
+                  .exists(congestionController.isUserCongested)
+              if (userCongested) {
                 if (isPrimary) {
                   
pushMergedDataCallback.onSuccess(StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED)
                 } else {

Reply via email to