This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 14593e0df [MINOR] Batch few celeborn client logs
14593e0df is described below
commit 14593e0df7d4fa373902d778915394891899ecc3
Author: Sanskar Modi <[email protected]>
AuthorDate: Tue Jun 17 15:34:48 2025 -0700
[MINOR] Batch few celeborn client logs
### What changes were proposed in this pull request?
Batching few log line on celeborn client side, which logs too much data in
spark driver logs for large application.
- Change partition print updated partition for each worker individually and
prints too many lines if many workers are involved in shuffle.
```
25/06/02 08:04:29 INFO LifecycleManager: Reserve buffer success for
shuffleId 6
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success
for shuffle 6, succeed partitions: [(partition 2477 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success
for shuffle 6, succeed partitions: [(partition 1285 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success
for shuffle 6, succeed partitions: [(partition 2660 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success
for shuffle 6, succeed partitions: [(partition 2194 epoch from 0 to 1),
(partition 1760 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success
for shuffle 6, succeed partitions: [(partition 1429 epoch from 0 to 1),
(partition 2300 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success
for shuffle 6, succeed partitions: [(partition 517 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success
for shuffle 6, succeed partitions: [(partition 2627 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success
for shuffle 6, succeed partitions: [(partition 2901 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success
for shuffle 6, succeed partitions: [(partition 2903 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success
for shuffle 6, succeed partitions: [(partition 2067 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success
for shuffle 6, succeed partitions: [(partition 569 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success
for shuffle 6, succeed partitions: [(partition 2633 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success
for shuffle 6, succeed partitions: [(partition 2813 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success
for shuffle 6, succeed partitions: [(partition 1817 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success
for shuffle 6, succeed partitions: [(partition 148 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success
for shuffle 6, succeed partitions: [(partition 2098 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success
for shuffle 6, succeed partitions: [(partition 1554 epoch from 0 to 1)].
```
- Clear shuffle print each shuffle id individually
```
25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 0.
25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 1.
25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 2.
25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 3.
25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 4.
25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 5.
25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 6.
25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 7.
```
### Why are the changes needed?
Both of the above logs gets printed a lot on celeborn client and can be
merged in a single line.
### Does this PR introduce _any_ user-facing change?
Client logs will have slight change.
### How was this patch tested?
NA
Closes #3313 from s0nskar/improve_logs.
Lead-authored-by: Sanskar Modi <[email protected]>
Co-authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit d44242ec2019bedf3d27047b6c069c68c7deeaab)
Signed-off-by: Wang, Fei <[email protected]>
---
.../celeborn/client/ChangePartitionManager.scala | 18 +++++------
.../apache/celeborn/client/LifecycleManager.scala | 37 ++++++++++++----------
2 files changed, 29 insertions(+), 26 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
index 5b8e4ce66..71096a952 100644
---
a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
@@ -394,19 +394,19 @@ class ChangePartitionManager(
// partition location can be null when call reserveSlotsWithRetry().
val locations = (primaryLocations.asScala ++
replicaLocations.asScala.map(_.getPeer))
.distinct.filter(_ != null)
- if (locations.nonEmpty) {
- val changes = locations.map { partition =>
- s"(partition ${partition.getId} epoch from ${partition.getEpoch -
1} to ${partition.getEpoch})"
- }.mkString("[", ", ", "]")
- logInfo(s"[Update partition] success for " +
- s"shuffle $shuffleId, succeed partitions: " +
- s"$changes.")
- }
-
// TODO: should record the new partition locations and acknowledge the
new partitionLocations to downstream task,
// in scenario the downstream task start early before the upstream
task.
locations
}
+
+ if (newPrimaryLocations.nonEmpty) {
+ val changes = newPrimaryLocations.map { partition =>
+ s"(partition ${partition.getId} epoch from ${partition.getEpoch - 1}
to ${partition.getEpoch})"
+ }.mkString("[", ", ", "]")
+ logInfo(s"[Update partition] success for " +
+ s"shuffle $shuffleId, succeed partitions: " +
+ s"$changes.")
+ }
replySuccess(newPrimaryLocations.toArray)
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 32cee3dda..9063ce7b8 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1689,10 +1689,10 @@ class LifecycleManager(val appUniqueId: String, val
conf: CelebornConf) extends
private def removeExpiredShuffle(): Unit = {
val currentTime = System.currentTimeMillis()
- val batchRemoveShuffleIds = new ArrayBuffer[Integer]
+ val shuffleIdsToRemove = new ArrayBuffer[Integer]
unregisterShuffleTime.keys().asScala.foreach { shuffleId =>
if (unregisterShuffleTime.get(shuffleId) < currentTime -
shuffleExpiredCheckIntervalMs) {
- logInfo(s"Clear shuffle $shuffleId.")
+ shuffleIdsToRemove += shuffleId
// clear for the shuffle
registeredShuffle.remove(shuffleId)
registeringShuffleRequest.remove(shuffleId)
@@ -1700,28 +1700,31 @@ class LifecycleManager(val appUniqueId: String, val
conf: CelebornConf) extends
latestPartitionLocation.remove(shuffleId)
commitManager.removeExpiredShuffle(shuffleId)
changePartitionManager.removeExpiredShuffle(shuffleId)
- if (!batchRemoveExpiredShufflesEnabled) {
+ invalidatedBroadcastGetReducerFileGroupResponse(shuffleId)
+ }
+ }
+
+ if (shuffleIdsToRemove.nonEmpty) {
+ logInfo(s"Clear shuffleIds: (${shuffleIdsToRemove.mkString(", ")}).")
+ if (!batchRemoveExpiredShufflesEnabled) {
+ shuffleIdsToRemove.foreach { shuffleId =>
val unregisterShuffleResponse = requestMasterUnregisterShuffle(
UnregisterShuffle(appUniqueId, shuffleId,
MasterClient.genRequestId()))
// if unregister shuffle not success, wait next turn
if (StatusCode.SUCCESS ==
StatusCode.fromValue(unregisterShuffleResponse.getStatus)) {
unregisterShuffleTime.remove(shuffleId)
}
- } else {
- batchRemoveShuffleIds += shuffleId
}
- invalidatedBroadcastGetReducerFileGroupResponse(shuffleId)
- }
- }
- if (batchRemoveShuffleIds.nonEmpty) {
- val unregisterShuffleResponse = batchRequestMasterUnregisterShuffles(
- BatchUnregisterShuffles(
- appUniqueId,
- batchRemoveShuffleIds.asJava,
- MasterClient.genRequestId()))
- if (StatusCode.SUCCESS ==
StatusCode.fromValue(unregisterShuffleResponse.getStatus)) {
- batchRemoveShuffleIds.foreach { shuffleId: Integer =>
- unregisterShuffleTime.remove(shuffleId)
+ } else {
+ val unregisterShuffleResponse = batchRequestMasterUnregisterShuffles(
+ BatchUnregisterShuffles(
+ appUniqueId,
+ shuffleIdsToRemove.asJava,
+ MasterClient.genRequestId()))
+ if (StatusCode.SUCCESS ==
StatusCode.fromValue(unregisterShuffleResponse.getStatus)) {
+ shuffleIdsToRemove.foreach { shuffleId: Integer =>
+ unregisterShuffleTime.remove(shuffleId)
+ }
}
}
}