zuston commented on code in PR #383:
URL: https://github.com/apache/incubator-uniffle/pull/383#discussion_r1040562696


##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -143,86 +143,98 @@ public void addToFlushQueue(ShuffleDataFlushEvent event) {
   }
 
   private void flushToFile(ShuffleDataFlushEvent event) {
-
-    Storage storage = storageManager.selectStorage(event);
-    if (storage != null && !storage.canWrite()) {
-      addPendingEvents(event);
-      return;
-    }
-
     long start = System.currentTimeMillis();
-    List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
     boolean writeSuccess = false;
-    try {
-      // storage info maybe null if the application cache was cleared already
-      if (storage != null) {
-        if (blocks == null || blocks.isEmpty()) {
-          LOG.info("There is no block to be flushed: " + event);
-        } else if (!event.isValid()) {
-          //  avoid printing error log
+
+    while (event.getRetryTimes() <= retryMax) {
+      try {
+        if (!event.isValid()) {
           writeSuccess = true;
           LOG.warn("AppId {} was removed already, event {} should be dropped", 
event.getAppId(), event);
-        } else {
-          String user = StringUtils.defaultString(
-              
shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
-              StringUtils.EMPTY
-          );
-          CreateShuffleWriteHandlerRequest request = new 
CreateShuffleWriteHandlerRequest(
-                  storageType,
-                  event.getAppId(),
-                  event.getShuffleId(),
-                  event.getStartPartition(),
-                  event.getEndPartition(),
-                  storageBasePaths.toArray(new 
String[storageBasePaths.size()]),
-                  shuffleServerId,
-                  hadoopConf,
-                  storageDataReplica,
-                  user);
-          ShuffleWriteHandler handler = 
storage.getOrCreateWriteHandler(request);
-          do {
-            if (event.getRetryTimes() > retryMax) {
-              LOG.error("Failed to write data for " + event + " in " + 
retryMax + " times, shuffle data will be lost");
-              
ShuffleServerMetrics.incStorageFailedCounter(storage.getStorageHost());
-              break;
-            }
-            if (!event.isValid()) {
-              LOG.warn("AppId {} was removed already, event {} should be 
dropped, may leak one handler",
-                  event.getAppId(), event);
-              //  avoid printing error log
-              writeSuccess = true;
-              break;
-            }
+          break;
+        }
 
-            writeSuccess = storageManager.write(storage, handler, event);
+        List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
+        if (blocks == null || blocks.isEmpty()) {
+          LOG.info("There is no block to be flushed: {}", event);
+          break;
+        }
 
-            if (writeSuccess) {
-              updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), 
blocks);
-              
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
+        Storage storage = storageManager.selectStorage(event);
+        if (storage == null) {
+          break;
+        }
+
+        if (!storage.canWrite()) {
+          // todo: Could we add an interface supportPending for storageManager
+          //       to unify following logic of multiple different storage 
managers
+          if (storageManager instanceof MultiStorageManager) {
+            event.increaseRetryTimes();
+            
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
+            continue;
+          } else {
+            // To avoid being re-pushed to pending queue and make the server 
too much pressure,
+            // it's better to drop directly.
+            if (event.isPended()) {
+              LOG.error("Drop this event directly due to already having 
entered pending queue. event: {}", event);
               break;
-            } else {
-              event.increaseRetryTimes();
-              
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
             }
-          } while (event.getRetryTimes() <= retryMax);
+            event.increaseRetryTimes();
+            event.markPended();
+            addPendingEvents(event);
+            return;
+          }
         }
-      }
-    } catch (Exception e) {
-      // just log the error, don't throw the exception and stop the flush 
thread
-      LOG.error("Exception happened when process flush shuffle data for " + 
event, e);
-    } finally {
-      cleanupFlushEventData(event);
-      if (shuffleServer != null) {
-        long duration = System.currentTimeMillis() - start;
+
+        String user = StringUtils.defaultString(
+            
shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
+            StringUtils.EMPTY
+        );
+        CreateShuffleWriteHandlerRequest request = new 
CreateShuffleWriteHandlerRequest(
+            storageType,
+            event.getAppId(),
+            event.getShuffleId(),
+            event.getStartPartition(),
+            event.getEndPartition(),
+            storageBasePaths.toArray(new String[storageBasePaths.size()]),
+            shuffleServerId,
+            hadoopConf,
+            storageDataReplica,
+            user);
+        ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
+        writeSuccess = storageManager.write(storage, handler, event);
         if (writeSuccess) {
-          LOG.debug("Flush to file success in " + duration + " ms and release 
" + event.getSize() + " bytes");
+          updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), 
blocks);
+          
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
+          break;
         } else {
-          LOG.error("Flush to file for " + event + " failed in "
-              + duration + " ms and release " + event.getSize() + " bytes");
+          event.increaseRetryTimes();
+          
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
         }
+      } catch (Throwable throwable) {
+        // just log the error, don't throw the exception and stop the flush 
thread
+        LOG.error("Exception happened when process flush shuffle data for {}", 
event, throwable);
+        event.increaseRetryTimes();
       }
     }
-  }
 
+    if (event.getRetryTimes() > retryMax) {
+      LOG.error("Failed to write data for {} in {} times, shuffle data will be 
lost", event, retryMax);
+      
ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost());

Review Comment:
   Nice catch. Fixed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to