This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d20d88d2 [#2073] fix(server): Throws EventRetryException when error 
happens on creating write handler (#2074)
2d20d88d2 is described below

commit 2d20d88d2da5ce57dce389d37cd54790be64bf82
Author: xianjingfeng <[email protected]>
AuthorDate: Mon Sep 2 17:37:14 2024 +0800

    [#2073] fix(server): Throws EventRetryException when error happens on 
creating write handler (#2074)
    
    ### What changes were proposed in this pull request?
    
    Throws EventRetryException If Exception is encountered during create write 
handler.
    
    ### Why are the changes needed?
    
    Fix: #2073
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI
---
 .../uniffle/server/DefaultFlushEventHandler.java   |  4 +-
 .../apache/uniffle/server/ShuffleFlushManager.java |  9 ++-
 .../uniffle/server/ShuffleFlushManagerTest.java    | 64 ++++++++++++++++++++++
 3 files changed, 74 insertions(+), 3 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
index e0a47526f..024fc6b5d 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
@@ -122,9 +122,9 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
         return;
       }
 
+      ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
+      ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
       if (e instanceof EventDiscardException) {
-        ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
-        ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
         if (storage != null) {
           
ShuffleServerMetrics.incStorageFailedCounter(storage.getStorageHost());
         }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index a80267546..c08c358c0 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -169,7 +169,14 @@ public class ShuffleFlushManager {
               storageDataReplica,
               user,
               maxConcurrencyPerPartitionToWrite);
-      ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
+      ShuffleWriteHandler handler;
+      try {
+        handler = storage.getOrCreateWriteHandler(request);
+      } catch (Exception e) {
+        LOG.warn("Failed to create write handler for event: {}", event, e);
+        throw new EventRetryException(e);
+      }
+
       long startTime = System.currentTimeMillis();
       boolean writeSuccess = storageManager.write(storage, handler, event);
       if (!writeSuccess) {
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 2b21445e3..59e92e320 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -19,8 +19,13 @@ package org.apache.uniffle.server;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.nio.file.Files;
+import java.nio.file.attribute.PosixFileAttributeView;
+import java.nio.file.attribute.PosixFilePermission;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
@@ -40,6 +45,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -58,9 +64,11 @@ import org.apache.uniffle.server.buffer.ShuffleBufferManager;
 import org.apache.uniffle.server.event.AppPurgeEvent;
 import org.apache.uniffle.server.event.ShufflePurgeEvent;
 import org.apache.uniffle.server.storage.HadoopStorageManager;
+import org.apache.uniffle.server.storage.HadoopStorageManagerFallbackStrategy;
 import org.apache.uniffle.server.storage.HybridStorageManager;
 import org.apache.uniffle.server.storage.LocalStorageManager;
 import org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy;
+import org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.server.storage.StorageManagerFactory;
 import org.apache.uniffle.storage.HadoopTestBase;
@@ -280,6 +288,62 @@ public class ShuffleFlushManagerTest extends 
HadoopTestBase {
     waitForMetrics(ShuffleServerMetrics.gaugeWriteHandler, 0, 0.5);
   }
 
+  @Test
+  public void testCreateWriteHandlerFailed(@TempDir File tmpDir) throws 
Exception {
+    String appId = "testCreateWriteHandlerFailed_appId";
+    File localStorageDirectory = new File(tmpDir, appId + "_rssdata");
+    localStorageDirectory.mkdirs();
+    EnumSet<PosixFilePermission> perms =
+        EnumSet.of(
+            PosixFilePermission.OWNER_READ,
+            PosixFilePermission.OWNER_EXECUTE,
+            PosixFilePermission.GROUP_EXECUTE,
+            PosixFilePermission.OTHERS_READ,
+            PosixFilePermission.OTHERS_EXECUTE);
+    Files.getFileAttributeView(localStorageDirectory.toPath(), 
PosixFileAttributeView.class)
+        .setPermissions(perms);
+
+    ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
+    shuffleServerConf.set(
+        ShuffleServerConf.HYBRID_STORAGE_FALLBACK_STRATEGY_CLASS,
+        RotateStorageManagerFallbackStrategy.class.getCanonicalName());
+    shuffleServerConf.set(
+        ShuffleServerConf.RSS_STORAGE_BASE_PATH,
+        Lists.newArrayList(localStorageDirectory.getAbsolutePath()));
+    shuffleServerConf.setString(
+        shuffleServerConf.RSS_STORAGE_TYPE.key(), 
StorageType.LOCALFILE_HDFS.name());
+    ReentrantReadWriteLock rsLock = new ReentrantReadWriteLock();
+    when(mockShuffleServer.getShuffleTaskManager().getAppReadLock(appId))
+        .thenReturn(rsLock.readLock());
+
+    StorageManager storageManager =
+        
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+    storageManager.registerRemoteStorage(appId, remoteStorage);
+    ShuffleFlushManager manager =
+        new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, 
storageManager);
+    ShuffleDataFlushEvent event1 = createShuffleDataFlushEvent(appId, 1, 1, 1, 
null);
+
+    manager.addToFlushQueue(event1);
+    waitForFlush(manager, appId, 1, 5);
+    assertEquals(1, event1.getRetryTimes());
+    List<ShufflePartitionedBlock> blocks1 = event1.getShuffleBlocks();
+    assertEquals(blocks1.size(), manager.getCommittedBlockIds(appId, 
1).getLongCardinality());
+
+    int maxRetryTimes = 5;
+    shuffleServerConf.set(ShuffleServerConf.SERVER_WRITE_RETRY_MAX, 
maxRetryTimes);
+    shuffleServerConf.set(
+        ShuffleServerConf.HYBRID_STORAGE_FALLBACK_STRATEGY_CLASS,
+        HadoopStorageManagerFallbackStrategy.class.getCanonicalName());
+    storageManager = 
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+    storageManager.registerRemoteStorage(appId, remoteStorage);
+    manager = new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, 
storageManager);
+    ShuffleDataFlushEvent event2 = createShuffleDataFlushEvent(appId, 1, 1, 1, 
null);
+    manager.addToFlushQueue(event2);
+    Awaitility.await()
+        .timeout(Duration.ofSeconds(20))
+        .until(() -> event2.getRetryTimes() == maxRetryTimes + 1);
+  }
+
   @Test
   public void localMetricsTest(@TempDir File tempDir) throws Exception {
     shuffleServerConf.set(

Reply via email to