This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 33ba0e02f [CELEBORN-1775] Improve some logs
33ba0e02f is described below
commit 33ba0e02f56bfa032c02d1e41c52573c79661b1b
Author: onebox-li <[email protected]>
AuthorDate: Mon Dec 16 16:24:18 2024 +0800
[CELEBORN-1775] Improve some logs
### What changes were proposed in this pull request?
Improve some logs, mainly including checking commit result and waiting
partition location empty when worker gracefully shutdown.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
Some logs changed.
### How was this patch tested?
Manual test.
Closes #2995 from onebox-li/improve-logs.
Authored-by: onebox-li <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../celeborn/client/commit/CommitHandler.scala | 26 +++++++++++++---------
.../network/client/TransportClientFactory.java | 2 +-
.../common/meta/WorkerPartitionLocationInfo.scala | 16 +++++++++++--
.../celeborn/common/rpc/netty/Dispatcher.scala | 2 +-
.../deploy/worker/memory/MemoryManager.java | 4 ++--
.../service/deploy/worker/Controller.scala | 2 +-
.../deploy/worker/storage/StorageManager.scala | 8 +++++--
7 files changed, 40 insertions(+), 20 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index 9cade38cc..637a5c640 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -316,10 +316,15 @@ abstract class CommitHandler(
case scala.util.Success(res) =>
res.status match {
case StatusCode.SUCCESS | StatusCode.PARTIAL_SUCCESS |
StatusCode.SHUFFLE_NOT_REGISTERED | StatusCode.REQUEST_FAILED |
StatusCode.WORKER_EXCLUDED | StatusCode.COMMIT_FILE_EXCEPTION =>
- logInfo(s"Request commitFiles return ${res.status} for " +
- s"${Utils.makeShuffleKey(appUniqueId, shuffleId)}")
- if (res.status != StatusCode.SUCCESS && res.status !=
StatusCode.WORKER_EXCLUDED) {
- commitFilesFailedWorkers.put(worker, (res.status,
System.currentTimeMillis()))
+ if (res.status == StatusCode.SUCCESS) {
+ logDebug(s"Request commitFiles return ${res.status} for " +
+ s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} from
worker ${worker.readableAddress()}")
+ } else {
+ logWarning(s"Request commitFiles return ${res.status} for
" +
+ s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} from
worker ${worker.readableAddress()}")
+ if (res.status != StatusCode.WORKER_EXCLUDED) {
+ commitFilesFailedWorkers.put(worker, (res.status,
System.currentTimeMillis()))
+ }
}
processResponse(res, worker)
iter.remove()
@@ -333,7 +338,7 @@ abstract class CommitHandler(
s"Request commitFiles return
${StatusCode.COMMIT_FILES_MOCK_FAILURE} for " +
s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} for
${status.retriedTimes}/$maxRetries, will not retry")
val res = createFailResponse(status)
- processResponse(res, status.workerInfo)
+ processResponse(res, worker)
iter.remove()
}
case _ =>
@@ -341,16 +346,15 @@ abstract class CommitHandler(
}
case scala.util.Failure(e) =>
- val worker = status.workerInfo
if (status.retriedTimes < maxRetries) {
logError(
- s"Ask worker($worker) CommitFiles for $shuffleId failed" +
+ s"Ask worker(${worker.readableAddress()}) CommitFiles for
$shuffleId failed" +
s" (attempt ${status.retriedTimes}/$maxRetries), will
retry.",
e)
retryCommitFiles(status, currentTime)
} else {
logError(
- s"Ask worker($worker) CommitFiles for $shuffleId failed" +
+ s"Ask worker(${worker.readableAddress()}) CommitFiles for
$shuffleId failed" +
s" (attempt ${status.retriedTimes}/$maxRetries), will not
retry.",
e)
val res = createFailResponse(status)
@@ -361,12 +365,12 @@ abstract class CommitHandler(
} else if (currentTime - status.startTime > timeout) {
if (status.retriedTimes < maxRetries) {
logError(
- s"Ask worker($worker) CommitFiles for $shuffleId failed because
of Timeout" +
+ s"Ask worker(${worker.readableAddress()}) CommitFiles for
$shuffleId failed because of Timeout" +
s" (attempt ${status.retriedTimes}/$maxRetries), will retry.")
retryCommitFiles(status, currentTime)
} else {
logError(
- s"Ask worker($worker) CommitFiles for $shuffleId failed because
of Timeout" +
+ s"Ask worker(${worker.readableAddress()}) CommitFiles for
$shuffleId failed because of Timeout" +
s" (attempt ${status.retriedTimes}/$maxRetries), will not
retry.")
}
}
@@ -382,7 +386,7 @@ abstract class CommitHandler(
while (iter.hasNext) {
val status = iter.next()
logError(
- s"Ask worker(${status.workerInfo}) CommitFiles for $shuffleId timed
out")
+ s"Ask worker(${status.workerInfo.readableAddress()}) CommitFiles for
$shuffleId timed out")
val res = createFailResponse(status)
processResponse(res, status.workerInfo)
iter.remove()
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index 7b87be40b..647ef280b 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -112,7 +112,7 @@ public class TransportClientFactory implements Closeable {
IOMode ioMode = IOMode.valueOf(conf.ioMode());
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
- logger.info("mode " + ioMode + " threads " + conf.clientThreads());
+ logger.info("Module {} mode {} threads {}", conf.getModuleName(), ioMode,
conf.clientThreads());
this.workerGroup =
NettyUtils.createEventLoop(ioMode, conf.clientThreads(),
conf.getModuleName() + "-client");
this.pooledAllocator =
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala
index 96ff20971..373f36565 100644
---
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala
@@ -181,10 +181,22 @@ class WorkerPartitionLocationInfo extends Logging {
}
def toStringSimplified: String = {
+ val primaryLocationStr = new StringBuilder
+ for ((shuffleKey, locations) <- primaryPartitionLocations.asScala) {
+ if (!locations.isEmpty) {
+ primaryLocationStr.append(s"($shuffleKey: ${locations.keySet()}) ")
+ }
+ }
+ val replicaLocationStr = new StringBuilder
+ for ((shuffleKey, locations) <- replicaPartitionLocations.asScala) {
+ if (!locations.isEmpty) {
+ replicaLocationStr.append(s"($shuffleKey: ${locations.keySet()}) ")
+ }
+ }
s"""
| Partition Location Info:
- | primary:
${primaryPartitionLocations.values().asScala.map(_.keySet().asScala)}
- | replica:
${replicaPartitionLocations.values().asScala.map(_.keySet().asScala)}
+ | primary: $primaryLocationStr
+ | replica: $replicaLocationStr
|""".stripMargin
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
index 5b7320726..c93203c36 100644
---
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
@@ -217,7 +217,7 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv,
rpcSource: RpcSource)
val numThreads =
nettyEnv.celebornConf.rpcDispatcherNumThreads(availableCores, role)
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads,
"celeborn-dispatcher")
- logInfo(s"Dispatcher numThreads: $numThreads")
+ logInfo(s"Celeborn dispatcher numThreads: $numThreads")
for (i <- 0 until numThreads) {
pool.execute(new MessageLoop)
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index 0b7adcb91..db368aa21 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -272,8 +272,8 @@ public class MemoryManager {
+ "max direct memory: {}, pause push memory: {}, "
+ "pause replication memory: {}, "
+ "read buffer memory limit: {} target: {}, "
- + "memory shuffle storage limit: {},"
- + "resume memory ratio: {},",
+ + "memory shuffle storage limit: {}, "
+ + "resume memory ratio: {}",
Utils.bytesToString(maxDirectMemory),
Utils.bytesToString(pausePushDataThreshold),
Utils.bytesToString(pauseReplicateThreshold),
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 0b0c53c1f..0481b274e 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -542,7 +542,7 @@ private[deploy] class Controller(
s"${emptyFilePrimaryIds.size()} empty primary partitions
${emptyFilePrimaryIds.asScala.mkString(",")}, " +
s"${failedPrimaryIds.size()} failed primary partitions, " +
s"${committedReplicaIds.size()} committed replica partitions, " +
- s"${emptyFileReplicaIds.size()} empty replica partitions
${emptyFileReplicaIds.asScala.mkString(",")} , " +
+ s"${emptyFileReplicaIds.size()} empty replica partitions
${emptyFileReplicaIds.asScala.mkString(",")}, " +
s"${failedReplicaIds.size()} failed replica partitions.")
CommitFilesResponse(
StatusCode.SUCCESS,
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 00c332ce0..c2f1c66a1 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -1049,8 +1049,12 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
if (diskInfo != null && diskInfo.status.equals(DiskStatus.HEALTHY)) {
diskInfo.dirs
} else {
- logDebug(s"Disk unavailable for $suggestedMountPoint, return all
healthy" +
- s" working dirs. diskInfo $diskInfo")
+ if (suggestedMountPoint.isEmpty) {
+ logDebug(s"Location suggestedMountPoint is not set, return all
healthy working dirs.")
+ } else {
+ logInfo(s"Disk(${diskInfo.mountPoint}) unavailable for
$suggestedMountPoint, return all healthy" +
+ s" working dirs.")
+ }
healthyWorkingDirs()
}
if (dirs.isEmpty && hdfsFlusher.isEmpty && s3Flusher.isEmpty) {