This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 14593e0df [MINOR] Batch few celeborn client logs
14593e0df is described below

commit 14593e0df7d4fa373902d778915394891899ecc3
Author: Sanskar Modi <[email protected]>
AuthorDate: Tue Jun 17 15:34:48 2025 -0700

    [MINOR] Batch few celeborn client logs
    
    ### What changes were proposed in this pull request?
    
    Batching few log line on celeborn client side, which logs too much data in 
spark driver logs for large application.
    
    - Change partition print updated partition for each worker individually and 
prints too many lines if many workers are involved in shuffle.
    ```
    25/06/02 08:04:29 INFO LifecycleManager: Reserve buffer success for 
shuffleId 6
    25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success 
for shuffle 6, succeed partitions: [(partition 2477 epoch from 0 to 1)].
    25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success 
for shuffle 6, succeed partitions: [(partition 1285 epoch from 0 to 1)].
    25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success 
for shuffle 6, succeed partitions: [(partition 2660 epoch from 0 to 1)].
    25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success 
for shuffle 6, succeed partitions: [(partition 2194 epoch from 0 to 1), 
(partition 1760 epoch from 0 to 1)].
    25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success 
for shuffle 6, succeed partitions: [(partition 1429 epoch from 0 to 1), 
(partition 2300 epoch from 0 to 1)].
    25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success 
for shuffle 6, succeed partitions: [(partition 517 epoch from 0 to 1)].
    25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success 
for shuffle 6, succeed partitions: [(partition 2627 epoch from 0 to 1)].
    25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success 
for shuffle 6, succeed partitions: [(partition 2901 epoch from 0 to 1)].
    25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success 
for shuffle 6, succeed partitions: [(partition 2903 epoch from 0 to 1)].
    25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success 
for shuffle 6, succeed partitions: [(partition 2067 epoch from 0 to 1)].
    25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success 
for shuffle 6, succeed partitions: [(partition 569 epoch from 0 to 1)].
    25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success 
for shuffle 6, succeed partitions: [(partition 2633 epoch from 0 to 1)].
    25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success 
for shuffle 6, succeed partitions: [(partition 2813 epoch from 0 to 1)].
    25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success 
for shuffle 6, succeed partitions: [(partition 1817 epoch from 0 to 1)].
    25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success 
for shuffle 6, succeed partitions: [(partition 148 epoch from 0 to 1)].
    25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success 
for shuffle 6, succeed partitions: [(partition 2098 epoch from 0 to 1)].
    25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success 
for shuffle 6, succeed partitions: [(partition 1554 epoch from 0 to 1)].
    ```
    -  Clear shuffle print each shuffle id individually
    
    ```
    25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 0.
    25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 1.
    25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 2.
    25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 3.
    25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 4.
    25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 5.
    25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 6.
    25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 7.
    ```
    
    ### Why are the changes needed?
    
    Both of the above logs gets printed a lot on celeborn client and can be 
merged in a single line.
    
    ### Does this PR introduce _any_ user-facing change?
    Client logs will have slight change.
    
    ### How was this patch tested?
    NA
    
    Closes #3313 from s0nskar/improve_logs.
    
    Lead-authored-by: Sanskar Modi <[email protected]>
    Co-authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit d44242ec2019bedf3d27047b6c069c68c7deeaab)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../celeborn/client/ChangePartitionManager.scala   | 18 +++++------
 .../apache/celeborn/client/LifecycleManager.scala  | 37 ++++++++++++----------
 2 files changed, 29 insertions(+), 26 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
index 5b8e4ce66..71096a952 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
@@ -394,19 +394,19 @@ class ChangePartitionManager(
         // partition location can be null when call reserveSlotsWithRetry().
         val locations = (primaryLocations.asScala ++ 
replicaLocations.asScala.map(_.getPeer))
           .distinct.filter(_ != null)
-        if (locations.nonEmpty) {
-          val changes = locations.map { partition =>
-            s"(partition ${partition.getId} epoch from ${partition.getEpoch - 
1} to ${partition.getEpoch})"
-          }.mkString("[", ", ", "]")
-          logInfo(s"[Update partition] success for " +
-            s"shuffle $shuffleId, succeed partitions: " +
-            s"$changes.")
-        }
-
         // TODO: should record the new partition locations and acknowledge the 
new partitionLocations to downstream task,
         //  in scenario the downstream task start early before the upstream 
task.
         locations
     }
+
+    if (newPrimaryLocations.nonEmpty) {
+      val changes = newPrimaryLocations.map { partition =>
+        s"(partition ${partition.getId} epoch from ${partition.getEpoch - 1} 
to ${partition.getEpoch})"
+      }.mkString("[", ", ", "]")
+      logInfo(s"[Update partition] success for " +
+        s"shuffle $shuffleId, succeed partitions: " +
+        s"$changes.")
+    }
     replySuccess(newPrimaryLocations.toArray)
   }
 
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 32cee3dda..9063ce7b8 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1689,10 +1689,10 @@ class LifecycleManager(val appUniqueId: String, val 
conf: CelebornConf) extends
 
   private def removeExpiredShuffle(): Unit = {
     val currentTime = System.currentTimeMillis()
-    val batchRemoveShuffleIds = new ArrayBuffer[Integer]
+    val shuffleIdsToRemove = new ArrayBuffer[Integer]
     unregisterShuffleTime.keys().asScala.foreach { shuffleId =>
       if (unregisterShuffleTime.get(shuffleId) < currentTime - 
shuffleExpiredCheckIntervalMs) {
-        logInfo(s"Clear shuffle $shuffleId.")
+        shuffleIdsToRemove += shuffleId
         // clear for the shuffle
         registeredShuffle.remove(shuffleId)
         registeringShuffleRequest.remove(shuffleId)
@@ -1700,28 +1700,31 @@ class LifecycleManager(val appUniqueId: String, val 
conf: CelebornConf) extends
         latestPartitionLocation.remove(shuffleId)
         commitManager.removeExpiredShuffle(shuffleId)
         changePartitionManager.removeExpiredShuffle(shuffleId)
-        if (!batchRemoveExpiredShufflesEnabled) {
+        invalidatedBroadcastGetReducerFileGroupResponse(shuffleId)
+      }
+    }
+
+    if (shuffleIdsToRemove.nonEmpty) {
+      logInfo(s"Clear shuffleIds: (${shuffleIdsToRemove.mkString(", ")}).")
+      if (!batchRemoveExpiredShufflesEnabled) {
+        shuffleIdsToRemove.foreach { shuffleId =>
           val unregisterShuffleResponse = requestMasterUnregisterShuffle(
             UnregisterShuffle(appUniqueId, shuffleId, 
MasterClient.genRequestId()))
           // if unregister shuffle not success, wait next turn
           if (StatusCode.SUCCESS == 
StatusCode.fromValue(unregisterShuffleResponse.getStatus)) {
             unregisterShuffleTime.remove(shuffleId)
           }
-        } else {
-          batchRemoveShuffleIds += shuffleId
         }
-        invalidatedBroadcastGetReducerFileGroupResponse(shuffleId)
-      }
-    }
-    if (batchRemoveShuffleIds.nonEmpty) {
-      val unregisterShuffleResponse = batchRequestMasterUnregisterShuffles(
-        BatchUnregisterShuffles(
-          appUniqueId,
-          batchRemoveShuffleIds.asJava,
-          MasterClient.genRequestId()))
-      if (StatusCode.SUCCESS == 
StatusCode.fromValue(unregisterShuffleResponse.getStatus)) {
-        batchRemoveShuffleIds.foreach { shuffleId: Integer =>
-          unregisterShuffleTime.remove(shuffleId)
+      } else {
+        val unregisterShuffleResponse = batchRequestMasterUnregisterShuffles(
+          BatchUnregisterShuffles(
+            appUniqueId,
+            shuffleIdsToRemove.asJava,
+            MasterClient.genRequestId()))
+        if (StatusCode.SUCCESS == 
StatusCode.fromValue(unregisterShuffleResponse.getStatus)) {
+          shuffleIdsToRemove.foreach { shuffleId: Integer =>
+            unregisterShuffleTime.remove(shuffleId)
+          }
         }
       }
     }

Reply via email to