This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch eof
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit 23f27c78066e43686dc0560c50a91e80d214781f
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Tue Sep 26 10:59:43 2023 +0800

    add logs
---
 .../celeborn/client/read/CelebornInputStream.java  | 22 +++++++++++++++++++++-
 1 file changed, 21 insertions(+), 1 deletion(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index 8645cc4c1..f9592cd32 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -442,6 +442,7 @@ public abstract class CelebornInputStream extends 
InputStream {
       }
 
       if (!fillBuffer()) {
+        logger.error("fillBuffered returned false in read()");
         return -1;
       }
 
@@ -468,6 +469,9 @@ public abstract class CelebornInputStream extends 
InputStream {
       while (readBytes < len) {
         while (position >= limit) {
           if (!fillBuffer()) {
+            if (readBytes <= 0) {
+              logger.error("expected size {}, actual read bytes {}", len, 
readBytes);
+            }
             return readBytes > 0 ? readBytes : -1;
           }
         }
@@ -493,6 +497,7 @@ public abstract class CelebornInputStream extends 
InputStream {
         logger.debug("Release chunk {}", currentChunk);
         currentChunk.release();
         currentChunk = null;
+        logger.error("currentChunk is set to null in close()");
       }
       if (currentReader != null) {
         logger.debug("Closing reader");
@@ -509,6 +514,12 @@ public abstract class CelebornInputStream extends 
InputStream {
         currentChunk.release();
       }
       currentChunk = null;
+      logger.error(
+          "currentChunk is set to null in moveToNextChunk, shuffleKey {}, 
attemptNumber {}, startMapIndex {}, endMapIndex {}",
+          shuffleKey,
+          attemptNumber,
+          startMapIndex,
+          endMapIndex);
       if (currentReader.hasNext()) {
         currentChunk = getNextChunk();
         return true;
@@ -525,6 +536,12 @@ public abstract class CelebornInputStream extends 
InputStream {
 
     private boolean fillBuffer() throws IOException {
       if (currentChunk == null) {
+        logger.error(
+            "fillBuffer returns false because currentChunk is null, shuffleKey 
{}, attemptNumber {}, startMapIndex {}, endMapIndex {}",
+            shuffleKey,
+            attemptNumber,
+            startMapIndex,
+            endMapIndex);
         return false;
       }
 
@@ -578,12 +595,15 @@ public abstract class CelebornInputStream extends 
InputStream {
             hasData = true;
             break;
           } else {
-            logger.debug(
+            logger.error(
                 "Skip duplicated batch: mapId {}, attemptId {}, batchId {}.",
                 mapId,
                 attemptId,
                 batchId);
           }
+        } else {
+          logger.error(
+              "succeeded attempt id {}, current attempt id {}", 
attempts[mapId], attemptId);
         }
       }
 

Reply via email to