This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new d786e0ecf [CELEBORN-874] Enrich Fetch log
d786e0ecf is described below

commit d786e0ecf5ac930d08103929487658826afc10e2
Author: Fu Chen <[email protected]>
AuthorDate: Sat Aug 5 12:15:30 2023 +0800

    [CELEBORN-874] Enrich Fetch log
    
    ### What changes were proposed in this pull request?
    
    As title
    
    ### Why are the changes needed?
    
    As title
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Pass GA
    
    Closes #1791 from cfmcgrady/enrich-fetch-log.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../common/network/client/TransportClient.java     | 10 ++++----
 .../network/client/TransportClientFactory.java     | 16 +++++++++----
 .../network/client/TransportResponseHandler.java   | 14 +++++++++++
 .../service/deploy/worker/FetchHandler.scala       | 28 ++++++++++++++++------
 4 files changed, 52 insertions(+), 16 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
index 00c25b56b..2c9eca4cb 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
@@ -120,14 +120,14 @@ public class TransportClient implements Closeable {
       int len,
       long fetchDataTimeout,
       ChunkReceivedCallback callback) {
+
+    StreamChunkSlice streamChunkSlice = new StreamChunkSlice(streamId, 
chunkIndex, offset, len);
     if (logger.isDebugEnabled()) {
       logger.debug(
           "Sending fetch chunk request {} to {}.",
-          chunkIndex,
+          streamChunkSlice,
           NettyUtils.getRemoteAddress(channel));
     }
-
-    StreamChunkSlice streamChunkSlice = new StreamChunkSlice(streamId, 
chunkIndex, offset, len);
     StdChannelListener listener =
         new StdChannelListener(streamChunkSlice) {
           @Override
@@ -306,9 +306,9 @@ public class TransportClient implements Closeable {
     @Override
     public void operationComplete(Future<? super Void> future) throws 
Exception {
       if (future.isSuccess()) {
-        if (logger.isTraceEnabled()) {
+        if (logger.isDebugEnabled()) {
           long timeTaken = System.currentTimeMillis() - startTime;
-          logger.trace(
+          logger.debug(
               "Sending request {} to {} took {} ms",
               requestId,
               NettyUtils.getRemoteAddress(channel),
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index e4b12fb96..9542ce3c0 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -147,8 +147,11 @@ public class TransportClientFactory implements Closeable {
       }
 
       if (cachedClient.isActive()) {
-        logger.trace(
-            "Returning cached connection to {}: {}", 
cachedClient.getSocketAddress(), cachedClient);
+        logger.debug(
+            "Returning cached connection from {} to {}: {}",
+            cachedClient.getChannel().localAddress(),
+            cachedClient.getSocketAddress(),
+            cachedClient);
         return cachedClient;
       }
     }
@@ -169,7 +172,11 @@ public class TransportClientFactory implements Closeable {
 
       if (cachedClient != null) {
         if (cachedClient.isActive()) {
-          logger.trace("Returning cached connection to {}: {}", 
resolvedAddress, cachedClient);
+          logger.debug(
+              "Returning cached connection from {} to {}: {}",
+              cachedClient.getChannel().localAddress(),
+              resolvedAddress,
+              cachedClient);
           return cachedClient;
         } else {
           logger.info("Found inactive connection to {}, creating a new one.", 
resolvedAddress);
@@ -237,7 +244,8 @@ public class TransportClientFactory implements Closeable {
     TransportClient client = clientRef.get();
     assert client != null : "Channel future completed successfully with null 
client";
 
-    logger.debug("Connection to {} successful", address);
+    logger.debug(
+        "Connection from {} to {} successful", 
client.getChannel().localAddress(), address);
 
     return client;
   }
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
index 2a4767848..7282fa913 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
@@ -233,6 +233,17 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
   @Override
   public void channelInactive() {
     if (numOutstandingRequests() > 0) {
+      // show the details of outstanding Fetches
+      if (logger.isDebugEnabled()) {
+        if (outstandingFetches.size() > 0) {
+          for (Map.Entry<StreamChunkSlice, FetchRequestInfo> e : 
outstandingFetches.entrySet()) {
+            StreamChunkSlice key = e.getKey();
+            logger.debug("The channel is closed, but there is still 
outstanding Fetch {}", key);
+          }
+        } else {
+          logger.debug("The channel is closed, the outstanding Fetches are 
empty");
+        }
+      }
       String remoteAddress = NettyUtils.getRemoteAddress(channel);
       logger.error(
           "Still have {} requests outstanding when connection from {} is 
closed",
@@ -262,6 +273,7 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
   public void handle(ResponseMessage message) throws Exception {
     if (message instanceof ChunkFetchSuccess) {
       ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
+      logger.debug("Chunk {} fetch succeeded", resp.streamChunkSlice);
       FetchRequestInfo info = outstandingFetches.remove(resp.streamChunkSlice);
       if (info == null) {
         logger.warn(
@@ -278,6 +290,8 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
       }
     } else if (message instanceof ChunkFetchFailure) {
       ChunkFetchFailure resp = (ChunkFetchFailure) message;
+      logger.error(
+          "chunk {} fetch failed, errorMessage {}", resp.streamChunkSlice, 
resp.errorString);
       FetchRequestInfo info = outstandingFetches.remove(resp.streamChunkSlice);
       if (info == null) {
         logger.warn(
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 66e0ecf11..5426bbb77 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -131,8 +131,9 @@ class FetchHandler(val conf: CelebornConf, val 
transportConf: TransportConf)
               startMapIndex,
               endMapIndex)
           }
-          logDebug(s"Received chunk fetch request $shuffleKey $fileName " +
-            s"$startMapIndex $endMapIndex get file info $fileInfo")
+          logDebug(s"Received chunk fetch request $shuffleKey $fileName 
$startMapIndex " +
+            s"$endMapIndex get file info $fileInfo from client channel " +
+            s"${NettyUtils.getRemoteAddress(client.getChannel)}")
           if (fileInfo.isHdfs) {
             val streamHandle = new StreamHandle(0, 0)
             client.getChannel.writeAndFlush(new RpcResponse(
@@ -147,11 +148,13 @@ class FetchHandler(val conf: CelebornConf, val 
transportConf: TransportConf)
               fetchTimeMetrics)
             val streamHandle = new StreamHandle(streamId, fileInfo.numChunks())
             if (fileInfo.numChunks() == 0)
-              logDebug(s"StreamId $streamId fileName $fileName startMapIndex" +
-                s" $startMapIndex endMapIndex $endMapIndex is empty.")
+              logDebug(s"StreamId $streamId, fileName $fileName, mapRange " +
+                s"[$startMapIndex-$endMapIndex] is empty. Received from client 
channel " +
+                s"${NettyUtils.getRemoteAddress(client.getChannel)}")
             else logDebug(
-              s"StreamId $streamId fileName $fileName numChunks 
${fileInfo.numChunks()} " +
-                s"startMapIndex $startMapIndex endMapIndex $endMapIndex")
+              s"StreamId $streamId, fileName $fileName, numChunks 
${fileInfo.numChunks()}, " +
+                s"mapRange [$startMapIndex-$endMapIndex]. Received from client 
channel " +
+                s"${NettyUtils.getRemoteAddress(client.getChannel)}")
             client.getChannel.writeAndFlush(new RpcResponse(
               request.requestId,
               new NioManagedBuffer(streamHandle.toByteBuffer)))
@@ -215,7 +218,7 @@ class FetchHandler(val conf: CelebornConf, val 
transportConf: TransportConf)
   }
 
   def handleChunkFetchRequest(client: TransportClient, req: 
ChunkFetchRequest): Unit = {
-    logTrace(s"Received req from 
${NettyUtils.getRemoteAddress(client.getChannel)}" +
+    logDebug(s"Received req from 
${NettyUtils.getRemoteAddress(client.getChannel)}" +
       s" to fetch block ${req.streamChunkSlice}")
 
     maxChunkBeingTransferred.foreach { threshold =>
@@ -243,6 +246,16 @@ class FetchHandler(val conf: CelebornConf, val 
transportConf: TransportConf)
       client.getChannel.writeAndFlush(new 
ChunkFetchSuccess(req.streamChunkSlice, buf))
         .addListener(new GenericFutureListener[Future[_ >: Void]] {
           override def operationComplete(future: Future[_ >: Void]): Unit = {
+            if (future.isSuccess()) {
+              if (log.isDebugEnabled) {
+                logDebug(
+                  s"Sending ChunkFetchSuccess operation succeeded, chunk 
${req.streamChunkSlice}")
+              }
+            } else {
+              logError(
+                s"Sending ChunkFetchSuccess operation failed, chunk 
${req.streamChunkSlice}",
+                future.cause())
+            }
             chunkStreamManager.chunkSent(req.streamChunkSlice.streamId)
             if (fetchTimeMetric != null) {
               fetchTimeMetric.update(System.nanoTime() - fetchBeginTime)
@@ -267,6 +280,7 @@ class FetchHandler(val conf: CelebornConf, val 
transportConf: TransportConf)
 
   /** Invoked when the channel associated with the given client is active. */
   override def channelActive(client: TransportClient): Unit = {
+    logDebug(s"channel active ${client.getSocketAddress}")
     workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT)
     super.channelActive(client)
   }

Reply via email to