This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch branch-0.10.bak
in repository https://gitbox.apache.org/repos/asf/uniffle.git

commit d2faa95668c9e97f4892a7855074840975af96de
Author: Junfan Zhang <zus...@apache.org>
AuthorDate: Tue Dec 17 11:25:13 2024 +0800

    [#2292] fix(server): Potential hang when HadoopShuffleWriteHandler 
initialization failure in PooledHadoopShuffleWriteHandler (#2293)
    
    ### What changes were proposed in this pull request?
    
    Correct the initialization flag to fix potential hang in 
PooledHadoopShuffleWriteHandler
    
    ### Why are the changes needed?
    
    fix: #2292
    
    ### Does this PR introduce _any_ user-facing change?
    <!--
    (Please list the user-facing changes introduced by your change, including
      1. Change in user-facing APIs.
      2. Addition or removal of property keys.)
    -->
    No.
    
    ### How was this patch tested?
    
    Unit tests
    
    
    ---------
    
    Co-authored-by: Junfan Zhang <zhangjun...@qiyi.com>
---
 .../impl/PooledHadoopShuffleWriteHandler.java      | 12 ++++---
 .../impl/PooledHadoopShuffleWriteHandlerTest.java  | 39 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 5 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
index f46ebfdb2..f801516bf 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.storage.handler.impl;
 
 import java.util.Collection;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -49,7 +50,7 @@ public class PooledHadoopShuffleWriteHandler implements 
ShuffleWriteHandler {
   private final int maxConcurrency;
   private final String basePath;
   private Function<Integer, ShuffleWriteHandler> createWriterFunc;
-  private volatile int initializedHandlerCnt = 0;
+  private AtomicInteger initializedHandlerCntRef = new AtomicInteger(0);
 
   // Only for tests
   @VisibleForTesting
@@ -109,10 +110,11 @@ public class PooledHadoopShuffleWriteHandler implements 
ShuffleWriteHandler {
 
   @Override
   public void write(Collection<ShufflePartitionedBlock> shuffleBlocks) throws 
Exception {
-    if (queue.isEmpty() && initializedHandlerCnt < maxConcurrency) {
+    if (queue.isEmpty() && initializedHandlerCntRef.get() < maxConcurrency) {
       synchronized (this) {
-        if (initializedHandlerCnt < maxConcurrency) {
-          queue.add(createWriterFunc.apply(initializedHandlerCnt++));
+        if (initializedHandlerCntRef.get() < maxConcurrency) {
+          queue.add(createWriterFunc.apply(initializedHandlerCntRef.get()));
+          initializedHandlerCntRef.addAndGet(1);
         }
       }
     }
@@ -132,6 +134,6 @@ public class PooledHadoopShuffleWriteHandler implements 
ShuffleWriteHandler {
 
   @VisibleForTesting
   protected int getInitializedHandlerCnt() {
-    return initializedHandlerCnt;
+    return initializedHandlerCntRef.get();
   }
 }
diff --git 
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
 
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
index 32e8786c8..460741db4 100644
--- 
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
+++ 
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
@@ -34,6 +34,7 @@ import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
 
 public class PooledHadoopShuffleWriteHandlerTest {
 
@@ -41,6 +42,7 @@ public class PooledHadoopShuffleWriteHandlerTest {
     private List<Integer> invokedList;
     private int index;
     private Runnable execution;
+    private boolean markInitializationFail = false;
 
     FakedShuffleWriteHandler(List<Integer> invokedList, int index, Runnable 
runnable) {
       this.invokedList = invokedList;
@@ -48,6 +50,12 @@ public class PooledHadoopShuffleWriteHandlerTest {
       this.execution = runnable;
     }
 
+    FakedShuffleWriteHandler(boolean isMarkInitializationFail) {
+      if (isMarkInitializationFail) {
+        throw new RuntimeException("Fail to init");
+      }
+    }
+
     FakedShuffleWriteHandler(
         List<Integer> initializedList, List<Integer> invokedList, int index, 
Runnable runnable) {
       initializedList.add(index);
@@ -63,6 +71,37 @@ public class PooledHadoopShuffleWriteHandlerTest {
     }
   }
 
+  @Test
+  public void initializationFailureTest() throws Exception {
+    int maxConcurrency = 2;
+    LinkedBlockingDeque<ShuffleWriteHandler> deque = new 
LinkedBlockingDeque<>(maxConcurrency);
+
+    PooledHadoopShuffleWriteHandler handler =
+        new PooledHadoopShuffleWriteHandler(
+            deque, maxConcurrency, index -> new 
FakedShuffleWriteHandler(true));
+
+    // to check the initialization
+    for (int i = 0; i < maxConcurrency; i++) {
+      try {
+        handler.write(Collections.emptyList());
+        fail();
+      } catch (Exception e) {
+        // ignore
+      }
+    }
+
+    // after initialization, the next writing will still fail due to the 
previous initialization
+    // fail.
+    for (int i = 0; i < maxConcurrency; i++) {
+      try {
+        handler.write(Collections.emptyList());
+        fail();
+      } catch (Exception e) {
+        // ignore
+      }
+    }
+  }
+
   @Test
   public void lazyInitializeWriterHandlerTest() throws Exception {
     int maxConcurrency = 5;

Reply via email to