This is an automated email from the ASF dual-hosted git repository. roryqi pushed a commit to branch branch-0.10.bak in repository https://gitbox.apache.org/repos/asf/uniffle.git
commit d2faa95668c9e97f4892a7855074840975af96de Author: Junfan Zhang <zus...@apache.org> AuthorDate: Tue Dec 17 11:25:13 2024 +0800 [#2292] fix(server): Potential hang when HadoopShuffleWriteHandler initialization failure in PooledHadoopShuffleWriteHandler (#2293) ### What changes were proposed in this pull request? Correct the initialization flag to fix potential hang in PooledHadoopShuffleWriteHandler ### Why are the changes needed? fix: #2292 ### Does this PR introduce _any_ user-facing change? <!-- (Please list the user-facing changes introduced by your change, including 1. Change in user-facing APIs. 2. Addition or removal of property keys.) --> No. ### How was this patch tested? Unit tests --------- Co-authored-by: Junfan Zhang <zhangjun...@qiyi.com> --- .../impl/PooledHadoopShuffleWriteHandler.java | 12 ++++--- .../impl/PooledHadoopShuffleWriteHandlerTest.java | 39 ++++++++++++++++++++++ 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java index f46ebfdb2..f801516bf 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java @@ -19,6 +19,7 @@ package org.apache.uniffle.storage.handler.impl; import java.util.Collection; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import com.google.common.annotations.VisibleForTesting; @@ -49,7 +50,7 @@ public class PooledHadoopShuffleWriteHandler implements ShuffleWriteHandler { private final int maxConcurrency; private final String basePath; private Function<Integer, ShuffleWriteHandler> createWriterFunc; - private volatile int initializedHandlerCnt = 0; + private AtomicInteger initializedHandlerCntRef = new AtomicInteger(0); // Only for tests @VisibleForTesting @@ -109,10 +110,11 @@ public class PooledHadoopShuffleWriteHandler implements ShuffleWriteHandler { @Override public void write(Collection<ShufflePartitionedBlock> shuffleBlocks) throws Exception { - if (queue.isEmpty() && initializedHandlerCnt < maxConcurrency) { + if (queue.isEmpty() && initializedHandlerCntRef.get() < maxConcurrency) { synchronized (this) { - if (initializedHandlerCnt < maxConcurrency) { - queue.add(createWriterFunc.apply(initializedHandlerCnt++)); + if (initializedHandlerCntRef.get() < maxConcurrency) { + queue.add(createWriterFunc.apply(initializedHandlerCntRef.get())); + initializedHandlerCntRef.addAndGet(1); } } } @@ -132,6 +134,6 @@ public class PooledHadoopShuffleWriteHandler implements ShuffleWriteHandler { @VisibleForTesting protected int getInitializedHandlerCnt() { - return initializedHandlerCnt; + return initializedHandlerCntRef.get(); } } diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java index 32e8786c8..460741db4 100644 --- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java +++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java @@ -34,6 +34,7 @@ import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; public class PooledHadoopShuffleWriteHandlerTest { @@ -41,6 +42,7 @@ public class PooledHadoopShuffleWriteHandlerTest { private List<Integer> invokedList; private int index; private Runnable execution; + private boolean markInitializationFail = false; FakedShuffleWriteHandler(List<Integer> invokedList, int index, Runnable runnable) { this.invokedList = invokedList; @@ -48,6 +50,12 @@ public class PooledHadoopShuffleWriteHandlerTest { this.execution = runnable; } + FakedShuffleWriteHandler(boolean isMarkInitializationFail) { + if (isMarkInitializationFail) { + throw new RuntimeException("Fail to init"); + } + } + FakedShuffleWriteHandler( List<Integer> initializedList, List<Integer> invokedList, int index, Runnable runnable) { initializedList.add(index); @@ -63,6 +71,37 @@ public class PooledHadoopShuffleWriteHandlerTest { } } + @Test + public void initializationFailureTest() throws Exception { + int maxConcurrency = 2; + LinkedBlockingDeque<ShuffleWriteHandler> deque = new LinkedBlockingDeque<>(maxConcurrency); + + PooledHadoopShuffleWriteHandler handler = + new PooledHadoopShuffleWriteHandler( + deque, maxConcurrency, index -> new FakedShuffleWriteHandler(true)); + + // to check the initialization + for (int i = 0; i < maxConcurrency; i++) { + try { + handler.write(Collections.emptyList()); + fail(); + } catch (Exception e) { + // ignore + } + } + + // after initialization, the next writing will still fail due to the previous initialization + // fail. + for (int i = 0; i < maxConcurrency; i++) { + try { + handler.write(Collections.emptyList()); + fail(); + } catch (Exception e) { + // ignore + } + } + } + @Test public void lazyInitializeWriterHandlerTest() throws Exception { int maxConcurrency = 5;