This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 33f3f1600 [CELEBORN-874] Enrich Fetch log
33f3f1600 is described below
commit 33f3f160093bb900dc65699a9aa592b5c4c491af
Author: Fu Chen <[email protected]>
AuthorDate: Sat Aug 5 12:15:30 2023 +0800
[CELEBORN-874] Enrich Fetch log
---
.../common/network/client/TransportClient.java | 10 +++----
.../network/client/TransportClientFactory.java | 16 ++++++++---
.../network/client/TransportResponseHandler.java | 14 +++++++++
.../service/deploy/worker/FetchHandler.scala | 33 +++++++++++++++++-----
4 files changed, 57 insertions(+), 16 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
index 00c25b56b..2c9eca4cb 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
@@ -120,14 +120,14 @@ public class TransportClient implements Closeable {
int len,
long fetchDataTimeout,
ChunkReceivedCallback callback) {
+
+ StreamChunkSlice streamChunkSlice = new StreamChunkSlice(streamId,
chunkIndex, offset, len);
if (logger.isDebugEnabled()) {
logger.debug(
"Sending fetch chunk request {} to {}.",
- chunkIndex,
+ streamChunkSlice,
NettyUtils.getRemoteAddress(channel));
}
-
- StreamChunkSlice streamChunkSlice = new StreamChunkSlice(streamId,
chunkIndex, offset, len);
StdChannelListener listener =
new StdChannelListener(streamChunkSlice) {
@Override
@@ -306,9 +306,9 @@ public class TransportClient implements Closeable {
@Override
public void operationComplete(Future<? super Void> future) throws
Exception {
if (future.isSuccess()) {
- if (logger.isTraceEnabled()) {
+ if (logger.isDebugEnabled()) {
long timeTaken = System.currentTimeMillis() - startTime;
- logger.trace(
+ logger.debug(
"Sending request {} to {} took {} ms",
requestId,
NettyUtils.getRemoteAddress(channel),
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index e4b12fb96..9542ce3c0 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -147,8 +147,11 @@ public class TransportClientFactory implements Closeable {
}
if (cachedClient.isActive()) {
- logger.trace(
- "Returning cached connection to {}: {}",
cachedClient.getSocketAddress(), cachedClient);
+ logger.debug(
+ "Returning cached connection from {} to {}: {}",
+ cachedClient.getChannel().localAddress(),
+ cachedClient.getSocketAddress(),
+ cachedClient);
return cachedClient;
}
}
@@ -169,7 +172,11 @@ public class TransportClientFactory implements Closeable {
if (cachedClient != null) {
if (cachedClient.isActive()) {
- logger.trace("Returning cached connection to {}: {}",
resolvedAddress, cachedClient);
+ logger.debug(
+ "Returning cached connection from {} to {}: {}",
+ cachedClient.getChannel().localAddress(),
+ resolvedAddress,
+ cachedClient);
return cachedClient;
} else {
logger.info("Found inactive connection to {}, creating a new one.",
resolvedAddress);
@@ -237,7 +244,8 @@ public class TransportClientFactory implements Closeable {
TransportClient client = clientRef.get();
assert client != null : "Channel future completed successfully with null
client";
- logger.debug("Connection to {} successful", address);
+ logger.debug(
+ "Connection from {} to {} successful",
client.getChannel().localAddress(), address);
return client;
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
index 2a4767848..7282fa913 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
@@ -233,6 +233,17 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
@Override
public void channelInactive() {
if (numOutstandingRequests() > 0) {
+ // show the details of outstanding Fetches
+ if (logger.isDebugEnabled()) {
+ if (outstandingFetches.size() > 0) {
+ for (Map.Entry<StreamChunkSlice, FetchRequestInfo> e :
outstandingFetches.entrySet()) {
+ StreamChunkSlice key = e.getKey();
+ logger.debug("The channel is closed, but there is still
outstanding Fetch {}", key);
+ }
+ } else {
+ logger.debug("The channel is closed, the outstanding Fetches are
empty");
+ }
+ }
String remoteAddress = NettyUtils.getRemoteAddress(channel);
logger.error(
"Still have {} requests outstanding when connection from {} is
closed",
@@ -262,6 +273,7 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
public void handle(ResponseMessage message) throws Exception {
if (message instanceof ChunkFetchSuccess) {
ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
+ logger.debug("Chunk {} fetch succeeded", resp.streamChunkSlice);
FetchRequestInfo info = outstandingFetches.remove(resp.streamChunkSlice);
if (info == null) {
logger.warn(
@@ -278,6 +290,8 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
}
} else if (message instanceof ChunkFetchFailure) {
ChunkFetchFailure resp = (ChunkFetchFailure) message;
+ logger.error(
+ "chunk {} fetch failed, errorMessage {}", resp.streamChunkSlice,
resp.errorString);
FetchRequestInfo info = outstandingFetches.remove(resp.streamChunkSlice);
if (info == null) {
logger.warn(
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 9b4d08f8c..f76429fef 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -131,8 +131,9 @@ class FetchHandler(val conf: CelebornConf, val
transportConf: TransportConf)
startMapIndex,
endMapIndex)
}
- logDebug(s"Received chunk fetch request $shuffleKey $fileName " +
- s"$startMapIndex $endMapIndex get file info $fileInfo")
+ logDebug(s"Received chunk fetch request $shuffleKey $fileName
$startMapIndex " +
+ s"$endMapIndex get file info $fileInfo from client channel " +
+ s"${NettyUtils.getRemoteAddress(client.getChannel)}")
if (fileInfo.isHdfs) {
val streamHandle = new StreamHandle(0, 0)
client.getChannel.writeAndFlush(new RpcResponse(
@@ -147,11 +148,13 @@ class FetchHandler(val conf: CelebornConf, val
transportConf: TransportConf)
fetchTimeMetrics)
val streamHandle = new StreamHandle(streamId, fileInfo.numChunks())
if (fileInfo.numChunks() == 0)
- logDebug(s"StreamId $streamId fileName $fileName startMapIndex" +
- s" $startMapIndex endMapIndex $endMapIndex is empty.")
+ logDebug(s"StreamId $streamId, fileName $fileName, mapRange " +
+ s"[$startMapIndex-$endMapIndex] is empty. Received from client
channel " +
+ s"${NettyUtils.getRemoteAddress(client.getChannel)}")
else logDebug(
- s"StreamId $streamId fileName $fileName numChunks
${fileInfo.numChunks()} " +
- s"startMapIndex $startMapIndex endMapIndex $endMapIndex")
+ s"StreamId $streamId, fileName $fileName, numChunks
${fileInfo.numChunks()}, " +
+ s"mapRange [$startMapIndex-$endMapIndex]. Received from client
channel " +
+ s"${NettyUtils.getRemoteAddress(client.getChannel)}")
client.getChannel.writeAndFlush(new RpcResponse(
request.requestId,
new NioManagedBuffer(streamHandle.toByteBuffer)))
@@ -215,7 +218,7 @@ class FetchHandler(val conf: CelebornConf, val
transportConf: TransportConf)
}
def handleChunkFetchRequest(client: TransportClient, req:
ChunkFetchRequest): Unit = {
- logTrace(s"Received req from
${NettyUtils.getRemoteAddress(client.getChannel)}" +
+ logDebug(s"Received req from
${NettyUtils.getRemoteAddress(client.getChannel)}" +
s" to fetch block ${req.streamChunkSlice}")
maxChunkBeingTransferred.foreach { threshold =>
@@ -243,6 +246,16 @@ class FetchHandler(val conf: CelebornConf, val
transportConf: TransportConf)
client.getChannel.writeAndFlush(new
ChunkFetchSuccess(req.streamChunkSlice, buf))
.addListener(new GenericFutureListener[Future[_ >: Void]] {
override def operationComplete(future: Future[_ >: Void]): Unit = {
+ if (future.isSuccess()) {
+ if (log.isDebugEnabled) {
+ logDebug(
+ s"Sending ChunkFetchSuccess operation succeeded, chunk
${req.streamChunkSlice}")
+ }
+ } else {
+ logError(
+ s"Sending ChunkFetchSuccess operation failed, chunk
${req.streamChunkSlice}",
+ future.cause())
+ }
chunkStreamManager.chunkSent(req.streamChunkSlice.streamId)
if (fetchTimeMetric != null) {
fetchTimeMetric.update(System.nanoTime() - fetchBeginTime)
@@ -265,6 +278,12 @@ class FetchHandler(val conf: CelebornConf, val
transportConf: TransportConf)
override def checkRegistered: Boolean = registered.get
+ /** Invoked when the channel associated with the given client is active. */
+ override def channelActive(client: TransportClient): Unit = {
+ logDebug(s"channel active ${client.getSocketAddress}")
+ super.channelActive(client)
+ }
+
override def channelInactive(client: TransportClient): Unit = {
creditStreamManager.connectionTerminated(client.getChannel)
logDebug(s"channel inactive ${client.getSocketAddress}")