This is an automated email from the ASF dual-hosted git repository. ethanfeng pushed a commit to branch branch-0.6 in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push: new 94c191702 [CELEBORN-2052] Fix unexpected warning logs in Flink caused by duplicate BufferStreamEnd messages 94c191702 is described below commit 94c191702ff41dbd6903cbd08dcb126d7ffeeb33 Author: codenohup <huangxu.wal...@gmail.com> AuthorDate: Mon Jul 7 17:56:58 2025 +0800 [CELEBORN-2052] Fix unexpected warning logs in Flink caused by duplicate BufferStreamEnd messages ### What changes were proposed in this pull request? In the Flink shuffle service plugin, if _RemoteBufferStreamReader_ receives an _EndOfPartition_ event, it will close itself. At the same time, when the Celeborn worker releases the corresponding stream, it also sends a _BufferStreamEnd_ message to _RemoteBufferStreamReader_, which leads the _ReadClientHandler_ to receive data from an already-closed stream and consequently logs an unexpected warning.  ### Why are the changes needed? Incorrect logs can easily cause confusion for users. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Covered by existing test cases. Closes #3357 from codenohup/fix-bufferstreamend. Authored-by: codenohup <huangxu.wal...@gmail.com> Signed-off-by: mingji <fengmingxiao....@alibaba-inc.com> (cherry picked from commit cd5d9cd93d1bf1a9ac9250edf1bf62aee0eb0ae2) Signed-off-by: mingji <fengmingxiao....@alibaba-inc.com> --- .../org/apache/celeborn/plugin/flink/network/ReadClientHandler.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java index 1ef2b7781..75e3bfe7b 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java @@ -64,11 +64,13 @@ public class ReadClientHandler extends BaseMessageHandler { logger.debug("received streamId: {}, msg :{}", streamId, msg); handler.accept(msg); } else { - if (msg != null && msg instanceof ReadData) { + if (msg instanceof ReadData) { ((ReadData) msg).getFlinkBuffer().release(); } - logger.warn("Unexpected streamId received: {}", streamId); + if (!(msg instanceof BufferStreamEnd)) { + logger.warn("Unexpected streamId received: {}, msg: {}", streamId, msg); + } } }