This is an automated email from the ASF dual-hosted git repository. zhouky pushed a commit to branch eof in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 23f27c78066e43686dc0560c50a91e80d214781f Author: zky.zhoukeyong <[email protected]> AuthorDate: Tue Sep 26 10:59:43 2023 +0800 add logs --- .../celeborn/client/read/CelebornInputStream.java | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java index 8645cc4c1..f9592cd32 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java +++ b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java @@ -442,6 +442,7 @@ public abstract class CelebornInputStream extends InputStream { } if (!fillBuffer()) { + logger.error("fillBuffered returned false in read()"); return -1; } @@ -468,6 +469,9 @@ public abstract class CelebornInputStream extends InputStream { while (readBytes < len) { while (position >= limit) { if (!fillBuffer()) { + if (readBytes <= 0) { + logger.error("expected size {}, actual read bytes {}", len, readBytes); + } return readBytes > 0 ? readBytes : -1; } } @@ -493,6 +497,7 @@ public abstract class CelebornInputStream extends InputStream { logger.debug("Release chunk {}", currentChunk); currentChunk.release(); currentChunk = null; + logger.error("currentChunk is set to null in close()"); } if (currentReader != null) { logger.debug("Closing reader"); @@ -509,6 +514,12 @@ public abstract class CelebornInputStream extends InputStream { currentChunk.release(); } currentChunk = null; + logger.error( + "currentChunk is set to null in moveToNextChunk, shuffleKey {}, attemptNumber {}, startMapIndex {}, endMapIndex {}", + shuffleKey, + attemptNumber, + startMapIndex, + endMapIndex); if (currentReader.hasNext()) { currentChunk = getNextChunk(); return true; @@ -525,6 +536,12 @@ public abstract class CelebornInputStream extends InputStream { private boolean fillBuffer() throws IOException { if (currentChunk == null) { + logger.error( + "fillBuffer returns false because currentChunk is null, shuffleKey {}, attemptNumber {}, startMapIndex {}, endMapIndex {}", + shuffleKey, + attemptNumber, + startMapIndex, + endMapIndex); return false; } @@ -578,12 +595,15 @@ public abstract class CelebornInputStream extends InputStream { hasData = true; break; } else { - logger.debug( + logger.error( "Skip duplicated batch: mapId {}, attemptId {}, batchId {}.", mapId, attemptId, batchId); } + } else { + logger.error( + "succeeded attempt id {}, current attempt id {}", attempts[mapId], attemptId); } }
