This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 082f0dd8c [CELEBORN-1775][FOLLOWUP] Improve logging around commit files
082f0dd8c is described below
commit 082f0dd8c5dc93f57d9234a5742690c29610ac88
Author: Sanskar Modi <[email protected]>
AuthorDate: Wed May 21 16:37:38 2025 -0700
[CELEBORN-1775][FOLLOWUP] Improve logging around commit files
### What changes were proposed in this pull request?
Minor logging improvement around commit files to log shuffleKey.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
Some logs will change.
### How was this patch tested?
NA
Closes #3270 from s0nskar/CELEBORN-1775.
Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
.../apache/celeborn/client/commit/CommitHandler.scala | 17 +++++++++--------
.../celeborn/service/deploy/worker/Controller.scala | 11 ++++++-----
2 files changed, 15 insertions(+), 13 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index c6f535d33..87fb3564c 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -330,6 +330,7 @@ abstract class CommitHandler(
while (iter.hasNext) {
val status = iter.next()
val worker = status.workerInfo
+ val shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId)
if (status.future.isCompleted) {
status.future.value.get match {
case scala.util.Success(res) =>
@@ -337,10 +338,10 @@ abstract class CommitHandler(
case StatusCode.SUCCESS | StatusCode.PARTIAL_SUCCESS |
StatusCode.SHUFFLE_NOT_REGISTERED | StatusCode.REQUEST_FAILED |
StatusCode.WORKER_EXCLUDED | StatusCode.COMMIT_FILE_EXCEPTION =>
if (res.status == StatusCode.SUCCESS) {
logDebug(s"Request commitFiles return ${res.status} for " +
- s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} from
worker ${worker.readableAddress()}")
+ s"$shuffleKey from worker ${worker.readableAddress()}")
} else {
logWarning(s"Request commitFiles return ${res.status} for
" +
- s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} from
worker ${worker.readableAddress()}")
+ s"$shuffleKey from worker ${worker.readableAddress()}")
if (res.status != StatusCode.WORKER_EXCLUDED) {
commitFilesFailedWorkers.put(worker, (res.status,
System.currentTimeMillis()))
}
@@ -350,12 +351,12 @@ abstract class CommitHandler(
case StatusCode.COMMIT_FILES_MOCK_FAILURE =>
if (status.retriedTimes < maxRetries) {
logError(s"Request commitFiles return ${res.status} for " +
- s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} for
${status.retriedTimes}/$maxRetries, will retry")
+ s"$shuffleKey for ${status.retriedTimes}/$maxRetries,
will retry")
retryCommitFiles(status, currentTime)
} else {
logError(
s"Request commitFiles return
${StatusCode.COMMIT_FILES_MOCK_FAILURE} for " +
- s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} for
${status.retriedTimes}/$maxRetries, will not retry")
+ s"$shuffleKey for ${status.retriedTimes}/$maxRetries,
will not retry")
val res = createFailResponse(status)
processResponse(res, worker)
iter.remove()
@@ -367,13 +368,13 @@ abstract class CommitHandler(
case scala.util.Failure(e) =>
if (status.retriedTimes < maxRetries) {
logError(
- s"Ask worker(${worker.readableAddress()}) CommitFiles for
$shuffleId failed" +
+ s"Ask worker(${worker.readableAddress()}) CommitFiles for
$shuffleKey failed" +
s" (attempt ${status.retriedTimes}/$maxRetries), will
retry.",
e)
retryCommitFiles(status, currentTime)
} else {
logError(
- s"Ask worker(${worker.readableAddress()}) CommitFiles for
$shuffleId failed" +
+ s"Ask worker(${worker.readableAddress()}) CommitFiles for
$shuffleKey failed" +
s" (attempt ${status.retriedTimes}/$maxRetries), will not
retry.",
e)
val res = createFailResponse(status)
@@ -384,12 +385,12 @@ abstract class CommitHandler(
} else if (currentTime - status.startTime > timeout) {
if (status.retriedTimes < maxRetries) {
logError(
- s"Ask worker(${worker.readableAddress()}) CommitFiles for
$shuffleId failed because of Timeout" +
+ s"Ask worker(${worker.readableAddress()}) CommitFiles for
$shuffleKey failed because of Timeout" +
s" (attempt ${status.retriedTimes}/$maxRetries), will retry.")
retryCommitFiles(status, currentTime)
} else {
logError(
- s"Ask worker(${worker.readableAddress()}) CommitFiles for
$shuffleId failed because of Timeout" +
+ s"Ask worker(${worker.readableAddress()}) CommitFiles for
$shuffleKey failed because of Timeout" +
s" (attempt ${status.retriedTimes}/$maxRetries), will not
retry.")
}
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 52650cb6c..a6870c836 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -620,19 +620,20 @@ private[deploy] class Controller(
new BiFunction[Void, Throwable, Unit] {
override def apply(v: Void, t: Throwable): Unit = {
if (null != t) {
+ val errMsg = s"Exception while handling commitFiles for
shuffleId: $shuffleKey"
t match {
case _: CancellationException =>
- logWarning("While handling commitFiles, canceled.")
+ logWarning(s"$errMsg, operation was cancelled.")
case ee: ExecutionException =>
- logError("While handling commitFiles, ExecutionException
raised.", ee)
+ logError(s"$errMsg, ExecutionException was raised.", ee)
case ie: InterruptedException =>
- logWarning("While handling commitFiles, interrupted.")
+ logWarning(s"$errMsg, operation was interrupted.")
Thread.currentThread().interrupt()
throw ie
case _: TimeoutException =>
- logWarning(s"While handling commitFiles, timeout after
$shuffleCommitTimeout ms.")
+ logWarning(s"$errMsg, operation timed out after
$shuffleCommitTimeout ms.")
case throwable: Throwable =>
- logError("While handling commitFiles, exception occurs.",
throwable)
+ logError(s"$errMsg, an unexpected exception occurred.",
throwable)
}
commitInfo.synchronized {
commitInfo.response = CommitFilesResponse(