This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 23e0a51e [#416] feat(hdfs): lazy initialization of 
hdfsShuffleWriteHandler when per-partition concurrent write is enabled (#816)
23e0a51e is described below

commit 23e0a51e380d368a829b18386022d02ab6cb4076
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Apr 17 11:32:25 2023 +0800

    [#416] feat(hdfs): lazy initialization of hdfsShuffleWriteHandler when 
per-partition concurrent write is enabled (#816)
    
    ### What changes were proposed in this pull request?
     lazy initialization of `hdfsShuffleWriteHandler` when per-partition 
concurrent write is enabled
    
    ### Why are the changes needed?
    Without this PR, it will create too much unnecessary `writerHandlers`  when 
enable the
    concurrent write of per-partition and no-race condition.
    
    This PR is to reduce unnecessary handler creation.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    1. UTs
---
 .../impl/PooledHdfsShuffleWriteHandler.java        | 60 +++++++++++++++-------
 .../impl/PooledHdfsShuffleWriteHandlerTest.java    | 54 +++++++++++++++++++
 2 files changed, 95 insertions(+), 19 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
index bb891a88..1cf85d8e 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.storage.handler.impl;
 
 import java.util.List;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.function.Function;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
@@ -45,6 +46,8 @@ public class PooledHdfsShuffleWriteHandler implements 
ShuffleWriteHandler {
   private final LinkedBlockingDeque<ShuffleWriteHandler> queue;
   private final int maxConcurrency;
   private final String basePath;
+  private Function<Integer, ShuffleWriteHandler> createWriterFunc;
+  private volatile int initializedHandlerCnt = 0;
 
   // Only for tests
   @VisibleForTesting
@@ -54,6 +57,17 @@ public class PooledHdfsShuffleWriteHandler implements 
ShuffleWriteHandler {
     this.basePath = StringUtils.EMPTY;
   }
 
+  @VisibleForTesting
+  public PooledHdfsShuffleWriteHandler(
+      LinkedBlockingDeque<ShuffleWriteHandler> queue,
+      int maxConcurrency,
+      Function<Integer, ShuffleWriteHandler> createWriterFunc) {
+    this.queue = queue;
+    this.maxConcurrency = maxConcurrency;
+    this.basePath = StringUtils.EMPTY;
+    this.createWriterFunc = createWriterFunc;
+  }
+
   public PooledHdfsShuffleWriteHandler(
       String appId,
       int shuffleId,
@@ -70,31 +84,34 @@ public class PooledHdfsShuffleWriteHandler implements 
ShuffleWriteHandler {
     this.basePath = 
ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath,
         ShuffleStorageUtils.getShuffleDataPath(appId, shuffleId, 
startPartition, endPartition));
 
-    // todo: support init lazily
-    try {
-      for (int i = 0; i < maxConcurrency; i++) {
-        // Use add() here because we are sure the capacity will not be 
exceeded.
-        // Note: add() throws IllegalStateException when queue is full.
-        queue.add(
-            new HdfsShuffleWriteHandler(
-                appId,
-                shuffleId,
-                startPartition,
-                endPartition,
-                storageBasePath,
-                fileNamePrefix + "_" + i,
-                hadoopConf,
-                user
-            )
+    this.createWriterFunc = index -> {
+      try {
+        return new HdfsShuffleWriteHandler(
+            appId,
+            shuffleId,
+            startPartition,
+            endPartition,
+            storageBasePath,
+            fileNamePrefix + "_" + index,
+            hadoopConf,
+            user
         );
+      } catch (Exception e) {
+        throw new RssException("Errors on initializing Hdfs writer handler.", 
e);
       }
-    } catch (Exception e) {
-      throw new RssException("Errors on initializing Hdfs writer handler.", e);
-    }
+    };
   }
 
   @Override
   public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws 
Exception {
+    if (queue.isEmpty() && initializedHandlerCnt < maxConcurrency) {
+      synchronized (this) {
+        if (initializedHandlerCnt < maxConcurrency) {
+          queue.add(createWriterFunc.apply(initializedHandlerCnt++));
+        }
+      }
+    }
+
     if (queue.isEmpty()) {
       LOGGER.warn("No free hdfs writer handler, it will wait. storage path: 
{}", basePath);
     }
@@ -107,4 +124,9 @@ public class PooledHdfsShuffleWriteHandler implements 
ShuffleWriteHandler {
       queue.addFirst(writeHandler);
     }
   }
+
+  @VisibleForTesting
+  protected int getInitializedHandlerCnt() {
+    return initializedHandlerCnt;
+  }
 }
diff --git 
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandlerTest.java
 
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandlerTest.java
index ca62b835..e3d655b2 100644
--- 
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandlerTest.java
+++ 
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.storage.handler.impl;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -46,6 +47,13 @@ public class PooledHdfsShuffleWriteHandlerTest {
       this.execution = runnable;
     }
 
+    FakedShuffleWriteHandler(List<Integer> initializedList, List<Integer> 
invokedList, int index, Runnable runnable) {
+      initializedList.add(index);
+      this.invokedList = invokedList;
+      this.index = index;
+      this.execution = runnable;
+    }
+
     @Override
     public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws 
Exception {
       execution.run();
@@ -53,6 +61,52 @@ public class PooledHdfsShuffleWriteHandlerTest {
     }
   }
 
+  @Test
+  public void lazyInitializeWriterHandlerTest() throws Exception {
+    int maxConcurrency = 5;
+    LinkedBlockingDeque deque = new LinkedBlockingDeque(maxConcurrency);
+
+    CopyOnWriteArrayList<Integer> invokedList = new CopyOnWriteArrayList<>();
+    CopyOnWriteArrayList<Integer> initializedList = new 
CopyOnWriteArrayList<>();
+
+    PooledHdfsShuffleWriteHandler handler = new PooledHdfsShuffleWriteHandler(
+        deque,
+        maxConcurrency,
+        index -> new FakedShuffleWriteHandler(initializedList, invokedList, 
index, () -> {
+          try {
+            Thread.sleep(10);
+          } catch (Exception e) {
+            // ignore
+          }
+        })
+    );
+
+    // case1: no race condition
+    for (int i = 0; i < 10; i++) {
+      handler.write(Collections.emptyList());
+      assertEquals(1, initializedList.size());
+    }
+
+    // case2: initialized by multi threads
+    invokedList.clear();
+    CountDownLatch latch = new CountDownLatch(100);
+    for (int i = 0; i < 100; i++) {
+      new Thread(() -> {
+        try {
+          handler.write(Collections.emptyList());
+        } catch (Exception e) {
+          // ignore
+        } finally {
+          latch.countDown();
+        }
+      }).start();
+    }
+    latch.await();
+    assertEquals(100, invokedList.size());
+    assertEquals(5, initializedList.size());
+    assertEquals(5, handler.getInitializedHandlerCnt());
+  }
+
   @Test
   public void writeSameFileWhenNoRaceCondition() throws Exception {
     int concurrency = 5;

Reply via email to