This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ec218878 [CELEBORN-876] Enhance log to find out failed workers if 
data lost
3ec218878 is described below

commit 3ec218878afc2285015405b9f7332dbe0afd694b
Author: mingji <[email protected]>
AuthorDate: Tue Aug 8 18:20:41 2023 +0800

    [CELEBORN-876] Enhance log to find out failed workers if data lost
    
    ### What changes were proposed in this pull request?
    1. Log offer slots results from LifecycleManager.
    2. Log change partition results from LifecycleManager.
    3. Log reserve slots results.
    4. Log fetch file group failure instead of data lost.
    
    ### Why are the changes needed?
    If data lost happened, we need to find out what worker cause this failure. 
So we need to check reserve slots result from LifecycleManager.
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    GA.
    
    Closes #1798 from FMX/CELEBORN-876.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../plugin/flink/readclient/FlinkShuffleClientImpl.java        |  6 +++++-
 .../org/apache/celeborn/client/ChangePartitionManager.scala    |  2 +-
 .../scala/org/apache/celeborn/client/LifecycleManager.scala    | 10 ++++------
 3 files changed, 10 insertions(+), 8 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
index f1d5cc5b9..876c4fdcd 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
@@ -177,7 +177,11 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
         } else {
           // refresh file groups
           ReduceFileGroups newGroups = loadFileGroupInternal(shuffleId);
-          if (newGroups == null || 
!newGroups.partitionIds.contains(partitionId)) {
+          if (newGroups == null) {
+            throw new IOException(
+                "Load file group from lifecycle manager failed: "
+                    + Utils.makeReducerKey(shuffleId, partitionId));
+          } else if (!newGroups.partitionIds.contains(partitionId)) {
             throw new IOException(
                 "shuffle data lost for partition: " + 
Utils.makeReducerKey(shuffleId, partitionId));
           }
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
index 520a4d052..ca3b2077e 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
@@ -298,7 +298,7 @@ class ChangePartitionManager(
             val changes = locations.map { partition =>
               s"(partition ${partition.getId} epoch from ${partition.getEpoch 
- 1} to ${partition.getEpoch})"
             }.mkString("[", ", ", "]")
-            logDebug(s"[Update partition] success for " +
+            logInfo(s"[Update partition] success for " +
               s"shuffle $shuffleId, succeed partitions: " +
               s"$changes.")
           }
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index dfe3d4165..6e052a0b9 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -403,16 +403,15 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
 
     res.status match {
       case StatusCode.REQUEST_FAILED =>
-        logDebug(s"OfferSlots RPC request failed for $shuffleId!")
+        logInfo(s"OfferSlots RPC request failed for $shuffleId!")
         reply(RegisterShuffleResponse(StatusCode.REQUEST_FAILED, Array.empty))
         return
       case StatusCode.SLOT_NOT_AVAILABLE =>
-        logDebug(s"OfferSlots for $shuffleId failed!")
+        logInfo(s"OfferSlots for $shuffleId failed!")
         reply(RegisterShuffleResponse(StatusCode.SLOT_NOT_AVAILABLE, 
Array.empty))
         return
       case StatusCode.SUCCESS =>
-        logInfo(s"OfferSlots for $shuffleId Success!")
-        logDebug(s" Slots Info: ${res.workerResource}")
+        logInfo(s"OfferSlots for $shuffleId Success!Slots Info: 
${res.workerResource}")
       case _ => // won't happen
         throw new UnsupportedOperationException()
     }
@@ -458,8 +457,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
       logError(s"reserve buffer for $shuffleId failed, reply to all.")
       reply(RegisterShuffleResponse(StatusCode.RESERVE_SLOTS_FAILED, 
Array.empty))
     } else {
-      logInfo(s"ReserveSlots for $shuffleId success!")
-      logDebug(s"Allocated Slots: $slots")
+      logInfo(s"ReserveSlots for $shuffleId success with details:$slots!")
       // Forth, register shuffle success, update status
       val allocatedWorkers =
         JavaUtils.newConcurrentHashMap[WorkerInfo, 
ShufflePartitionLocationInfo]()

Reply via email to