This is an automated email from the ASF dual-hosted git repository.

smallzhongfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 931d6cda0 [#1252] fix(server): Incorrect storage write fail metric 
(#1253)
931d6cda0 is described below

commit 931d6cda0e3a691867721f51eabb567d568831e8
Author: summaryzb <[email protected]>
AuthorDate: Wed Oct 25 19:35:58 2023 +0800

    [#1252] fix(server): Incorrect storage write fail metric (#1253)
    
    What changes were proposed in this pull request?
    As title
    
    Why are the changes needed?
    Fix: #1252
    
    Does this PR introduce any user-facing change?
    No.
    
    How was this patch tested?
    unit test
---
 .../apache/uniffle/server/ShuffleFlushManager.java | 95 +++++++++++++---------
 1 file changed, 57 insertions(+), 38 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 036ad69d0..eec574179 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -83,39 +83,41 @@ public class ShuffleFlushManager {
     eventHandler.handle(event);
   }
 
+  private void recordFinalFail(ShuffleDataFlushEvent event, long start) {
+    LOG.error(
+        "Failed to write data for {} in {} times, shuffle data will be lost", 
event, retryMax);
+    if (event.getUnderStorage() != null) {
+      
ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost());
+    }
+    event.doCleanup();
+    if (shuffleServer != null) {
+      long duration = System.currentTimeMillis() - start;
+      ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
+      LOG.error(
+          "Flush to file for {} failed in {} ms and release {} bytes",
+          event,
+          duration,
+          event.getSize());
+    }
+  }
+
+  private void recordSuccess(ShuffleDataFlushEvent event, long start) {
+    updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), 
event.getShuffleBlocks());
+    
ShuffleServerMetrics.incStorageSuccessCounter(event.getUnderStorage().getStorageHost());
+    event.doCleanup();
+    if (shuffleServer != null) {
+      if (LOG.isDebugEnabled()) {
+        long duration = System.currentTimeMillis() - start;
+        LOG.debug("Flush to file success in {} ms and release {} bytes", 
duration, event.getSize());
+      }
+    }
+  }
+
   public void processEvent(ShuffleDataFlushEvent event) {
     try {
       ShuffleServerMetrics.gaugeWriteHandler.inc();
-      long start = System.currentTimeMillis();
-      boolean writeSuccess = flushToFile(event);
-      if (writeSuccess || event.getRetryTimes() > retryMax) {
-        if (event.getRetryTimes() > retryMax) {
-          LOG.error(
-              "Failed to write data for {} in {} times, shuffle data will be 
lost",
-              event,
-              retryMax);
-          if (event.getUnderStorage() != null) {
-            
ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost());
-          }
-        }
-        event.doCleanup();
-        if (shuffleServer != null) {
-          long duration = System.currentTimeMillis() - start;
-          if (writeSuccess) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(
-                  "Flush to file success in {} ms and release {} bytes", 
duration, event.getSize());
-            }
-          } else {
-            ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
-            LOG.error(
-                "Flush to file for {} failed in {} ms and release {} bytes",
-                event,
-                duration,
-                event.getSize());
-          }
-        }
-      }
+      flushToFile(event);
+      // for thread safety we should not use or change any event info when 
write to file is failed
     } catch (Exception e) {
       LOG.error("Exception happened when flush data for " + event, e);
     } finally {
@@ -124,7 +126,12 @@ public class ShuffleFlushManager {
     }
   }
 
+  private boolean reachRetryMax(ShuffleDataFlushEvent event) {
+    return event.getRetryTimes() > retryMax;
+  }
+
   private boolean flushToFile(ShuffleDataFlushEvent event) {
+    long start = System.currentTimeMillis();
     boolean writeSuccess = false;
 
     try {
@@ -160,7 +167,7 @@ public class ShuffleFlushManager {
       if (!storage.canWrite()) {
         // todo: Could we add an interface supportPending for storageManager
         //       to unify following logic of multiple different storage 
managers
-        if (event.getRetryTimes() <= retryMax) {
+        if (!reachRetryMax(event)) {
           if (event.isPended()) {
             LOG.error(
                 "Drop this event directly due to already having entered 
pending queue. event: {}",
@@ -168,9 +175,15 @@ public class ShuffleFlushManager {
             return true;
           }
           event.increaseRetryTimes();
-          
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
           event.markPended();
-          eventHandler.handle(event);
+          if (!reachRetryMax(event)) {
+            
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
+            eventHandler.handle(event);
+          } else {
+            recordFinalFail(event, start);
+          }
+        } else {
+          recordFinalFail(event, start);
         }
         return false;
       }
@@ -196,23 +209,29 @@ public class ShuffleFlushManager {
       ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
       writeSuccess = storageManager.write(storage, handler, event);
       if (writeSuccess) {
-        updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), 
blocks);
-        
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
-      } else if (event.getRetryTimes() <= retryMax) {
+        recordSuccess(event, start);
+      } else if (!reachRetryMax(event)) {
         if (event.isPended()) {
           LOG.error(
               "Drop this event directly due to already having entered pending 
queue. event: {}",
               event);
         }
         event.increaseRetryTimes();
-        ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
         event.markPended();
-        eventHandler.handle(event);
+        if (!reachRetryMax(event)) {
+          
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
+          eventHandler.handle(event);
+        } else {
+          recordFinalFail(event, start);
+        }
       }
     } catch (Throwable throwable) {
       // just log the error, don't throw the exception and stop the flush 
thread
       LOG.error("Exception happened when process flush shuffle data for {}", 
event, throwable);
       event.increaseRetryTimes();
+      if (reachRetryMax(event)) {
+        recordFinalFail(event, start);
+      }
     }
     return writeSuccess;
   }

Reply via email to