This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new ddf22f775 [CELEBORN-2281] Improve error logging and null checks in 
CreditStreamManager
ddf22f775 is described below

commit ddf22f7752e4dd5a42a8905efb4161af85678353
Author: sychen <[email protected]>
AuthorDate: Wed Mar 18 15:55:32 2026 +0800

    [CELEBORN-2281] Improve error logging and null checks in CreditStreamManager
    
    ### What changes were proposed in this pull request?
    
    - Initialize `AtomicReference<IOException>` with proper syntax.
    - Add exception to `logger.error` for better error context.
    - Simplify and improve null checks and logging in `addCredit` and 
`cleanResource` methods.
    
    ### Why are the changes needed?
    
    nit.
    
    ### Does this PR resolve a correctness bug?
    
    No.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    GHA.
    
    Closes #3626 from cxzl25/CELEBORN-2281.
    
    Lead-authored-by: sychen <[email protected]>
    Co-authored-by: cxzl25 <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit af0ba1a5ec0e1faf3d4a0d189058c755aeb6b18c)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../deploy/worker/storage/CreditStreamManager.java | 25 +++++++++-------------
 1 file changed, 10 insertions(+), 15 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
index e8f408afe..1272776d3 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
@@ -93,7 +93,7 @@ public class CreditStreamManager {
         streamId,
         fileInfo);
 
-    AtomicReference<IOException> exception = new AtomicReference();
+    AtomicReference<IOException> exception = new AtomicReference<>();
     MapPartitionData mapPartitionData =
         activeMapPartitions.compute(
             fileInfo,
@@ -166,7 +166,7 @@ public class CreditStreamManager {
         mapPartitionData.addReaderCredit(numCredit, streamId);
       }
     } catch (Throwable e) {
-      logger.error("streamId: {}, add credit end: {}", streamId, numCredit);
+      logger.error("streamId: {}, failed to add credit: {}", streamId, 
numCredit, e);
     }
   }
 
@@ -185,23 +185,21 @@ public class CreditStreamManager {
         logger.warn("Only non-null SegmentMapPartitionData is expected for 
notifyRequiredSegment.");
       }
     } catch (Throwable e) {
-      logger.error(
-          String.format("Fail to notify segmentId %s for stream %s.", 
requiredSegmentId, streamId),
-          e);
+      logger.error("Failed to notify segmentId {} for stream {}.", 
requiredSegmentId, streamId, e);
       throw e;
     }
   }
 
   public void addCredit(int numCredit, long streamId) {
-    if (!streams.containsKey(streamId)) {
+    StreamState streamState = streams.get(streamId);
+    if (streamState == null) {
       // In flink hybrid shuffle integration strategy, the stream may release 
in worker before
       // client receive bufferStreamEnd,
       // and the client may send request with old streamId, so ignore 
non-exist streams.
       logger.warn("Ignore AddCredit from stream {}, numCredit {}.", streamId, 
numCredit);
       return;
     }
-    MapPartitionData mapPartitionData = 
streams.get(streamId).getMapPartitionData();
-    addCredit(mapPartitionData, numCredit, streamId);
+    addCredit(streamState.getMapPartitionData(), numCredit, streamId);
   }
 
   public void notifyRequiredSegment(int requiredSegmentId, long streamId, int 
subPartitionId) {
@@ -278,8 +276,9 @@ public class CreditStreamManager {
 
   public void cleanResource(Long streamId) {
     logger.debug("received clean stream: {}", streamId);
-    if (streams.containsKey(streamId)) {
-      MapPartitionData mapPartitionData = 
streams.get(streamId).getMapPartitionData();
+    StreamState streamState = streams.get(streamId);
+    if (streamState != null) {
+      MapPartitionData mapPartitionData = streamState.getMapPartitionData();
       if (mapPartitionData != null) {
         if (mapPartitionData.releaseReader(streamId)) {
           streams.remove(streamId);
@@ -380,11 +379,7 @@ public class CreditStreamManager {
 
     @Override
     public String toString() {
-      final StringBuilder sb = new StringBuilder("DelayedStreamId{");
-      sb.append("createMillis=").append(createMillis);
-      sb.append(", streamId=").append(streamId);
-      sb.append('}');
-      return sb.toString();
+      return "DelayedStreamId{" + "createMillis=" + createMillis + ", 
streamId=" + streamId + '}';
     }
   }
 }

Reply via email to