This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 94c191702 [CELEBORN-2052] Fix unexpected warning logs in Flink caused 
by duplicate BufferStreamEnd messages
94c191702 is described below

commit 94c191702ff41dbd6903cbd08dcb126d7ffeeb33
Author: codenohup <huangxu.wal...@gmail.com>
AuthorDate: Mon Jul 7 17:56:58 2025 +0800

    [CELEBORN-2052] Fix unexpected warning logs in Flink caused by duplicate 
BufferStreamEnd messages
    
    ### What changes were proposed in this pull request?
    In the Flink shuffle service plugin, if _RemoteBufferStreamReader_ receives 
an _EndOfPartition_ event, it will close itself.
    At the same time, when the Celeborn worker releases the corresponding 
stream, it also sends a _BufferStreamEnd_ message to 
_RemoteBufferStreamReader_, which leads the _ReadClientHandler_ to receive data 
from an already-closed stream and consequently logs an unexpected warning.
    
![image](https://github.com/user-attachments/assets/e48af502-385a-4e24-9d40-0557ccc886c5)
    
    ### Why are the changes needed?
    Incorrect logs can easily cause confusion for users.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Covered by existing test cases.
    
    Closes #3357 from codenohup/fix-bufferstreamend.
    
    Authored-by: codenohup <huangxu.wal...@gmail.com>
    Signed-off-by: mingji <fengmingxiao....@alibaba-inc.com>
    (cherry picked from commit cd5d9cd93d1bf1a9ac9250edf1bf62aee0eb0ae2)
    Signed-off-by: mingji <fengmingxiao....@alibaba-inc.com>
---
 .../org/apache/celeborn/plugin/flink/network/ReadClientHandler.java | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java
index 1ef2b7781..75e3bfe7b 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java
@@ -64,11 +64,13 @@ public class ReadClientHandler extends BaseMessageHandler {
       logger.debug("received streamId: {}, msg :{}", streamId, msg);
       handler.accept(msg);
     } else {
-      if (msg != null && msg instanceof ReadData) {
+      if (msg instanceof ReadData) {
         ((ReadData) msg).getFlinkBuffer().release();
       }
 
-      logger.warn("Unexpected streamId received: {}", streamId);
+      if (!(msg instanceof BufferStreamEnd)) {
+        logger.warn("Unexpected streamId received: {}, msg: {}", streamId, 
msg);
+      }
     }
   }
 

Reply via email to