This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 23e0a51e [#416] feat(hdfs): lazy initialization of
hdfsShuffleWriteHandler when per-partition concurrent write is enabled (#816)
23e0a51e is described below
commit 23e0a51e380d368a829b18386022d02ab6cb4076
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Apr 17 11:32:25 2023 +0800
[#416] feat(hdfs): lazy initialization of hdfsShuffleWriteHandler when
per-partition concurrent write is enabled (#816)
### What changes were proposed in this pull request?
lazy initialization of `hdfsShuffleWriteHandler` when per-partition
concurrent write is enabled
### Why are the changes needed?
Without this PR, it will create too much unnecessary `writerHandlers` when
enable the
concurrent write of per-partition and no-race condition.
This PR is to reduce unnecessary handler creation.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
1. UTs
---
.../impl/PooledHdfsShuffleWriteHandler.java | 60 +++++++++++++++-------
.../impl/PooledHdfsShuffleWriteHandlerTest.java | 54 +++++++++++++++++++
2 files changed, 95 insertions(+), 19 deletions(-)
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
index bb891a88..1cf85d8e 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.storage.handler.impl;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.function.Function;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
@@ -45,6 +46,8 @@ public class PooledHdfsShuffleWriteHandler implements
ShuffleWriteHandler {
private final LinkedBlockingDeque<ShuffleWriteHandler> queue;
private final int maxConcurrency;
private final String basePath;
+ private Function<Integer, ShuffleWriteHandler> createWriterFunc;
+ private volatile int initializedHandlerCnt = 0;
// Only for tests
@VisibleForTesting
@@ -54,6 +57,17 @@ public class PooledHdfsShuffleWriteHandler implements
ShuffleWriteHandler {
this.basePath = StringUtils.EMPTY;
}
+ @VisibleForTesting
+ public PooledHdfsShuffleWriteHandler(
+ LinkedBlockingDeque<ShuffleWriteHandler> queue,
+ int maxConcurrency,
+ Function<Integer, ShuffleWriteHandler> createWriterFunc) {
+ this.queue = queue;
+ this.maxConcurrency = maxConcurrency;
+ this.basePath = StringUtils.EMPTY;
+ this.createWriterFunc = createWriterFunc;
+ }
+
public PooledHdfsShuffleWriteHandler(
String appId,
int shuffleId,
@@ -70,31 +84,34 @@ public class PooledHdfsShuffleWriteHandler implements
ShuffleWriteHandler {
this.basePath =
ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath,
ShuffleStorageUtils.getShuffleDataPath(appId, shuffleId,
startPartition, endPartition));
- // todo: support init lazily
- try {
- for (int i = 0; i < maxConcurrency; i++) {
- // Use add() here because we are sure the capacity will not be
exceeded.
- // Note: add() throws IllegalStateException when queue is full.
- queue.add(
- new HdfsShuffleWriteHandler(
- appId,
- shuffleId,
- startPartition,
- endPartition,
- storageBasePath,
- fileNamePrefix + "_" + i,
- hadoopConf,
- user
- )
+ this.createWriterFunc = index -> {
+ try {
+ return new HdfsShuffleWriteHandler(
+ appId,
+ shuffleId,
+ startPartition,
+ endPartition,
+ storageBasePath,
+ fileNamePrefix + "_" + index,
+ hadoopConf,
+ user
);
+ } catch (Exception e) {
+ throw new RssException("Errors on initializing Hdfs writer handler.",
e);
}
- } catch (Exception e) {
- throw new RssException("Errors on initializing Hdfs writer handler.", e);
- }
+ };
}
@Override
public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws
Exception {
+ if (queue.isEmpty() && initializedHandlerCnt < maxConcurrency) {
+ synchronized (this) {
+ if (initializedHandlerCnt < maxConcurrency) {
+ queue.add(createWriterFunc.apply(initializedHandlerCnt++));
+ }
+ }
+ }
+
if (queue.isEmpty()) {
LOGGER.warn("No free hdfs writer handler, it will wait. storage path:
{}", basePath);
}
@@ -107,4 +124,9 @@ public class PooledHdfsShuffleWriteHandler implements
ShuffleWriteHandler {
queue.addFirst(writeHandler);
}
}
+
+ @VisibleForTesting
+ protected int getInitializedHandlerCnt() {
+ return initializedHandlerCnt;
+ }
}
diff --git
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandlerTest.java
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandlerTest.java
index ca62b835..e3d655b2 100644
---
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandlerTest.java
+++
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.storage.handler.impl;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
@@ -46,6 +47,13 @@ public class PooledHdfsShuffleWriteHandlerTest {
this.execution = runnable;
}
+ FakedShuffleWriteHandler(List<Integer> initializedList, List<Integer>
invokedList, int index, Runnable runnable) {
+ initializedList.add(index);
+ this.invokedList = invokedList;
+ this.index = index;
+ this.execution = runnable;
+ }
+
@Override
public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws
Exception {
execution.run();
@@ -53,6 +61,52 @@ public class PooledHdfsShuffleWriteHandlerTest {
}
}
+ @Test
+ public void lazyInitializeWriterHandlerTest() throws Exception {
+ int maxConcurrency = 5;
+ LinkedBlockingDeque deque = new LinkedBlockingDeque(maxConcurrency);
+
+ CopyOnWriteArrayList<Integer> invokedList = new CopyOnWriteArrayList<>();
+ CopyOnWriteArrayList<Integer> initializedList = new
CopyOnWriteArrayList<>();
+
+ PooledHdfsShuffleWriteHandler handler = new PooledHdfsShuffleWriteHandler(
+ deque,
+ maxConcurrency,
+ index -> new FakedShuffleWriteHandler(initializedList, invokedList,
index, () -> {
+ try {
+ Thread.sleep(10);
+ } catch (Exception e) {
+ // ignore
+ }
+ })
+ );
+
+ // case1: no race condition
+ for (int i = 0; i < 10; i++) {
+ handler.write(Collections.emptyList());
+ assertEquals(1, initializedList.size());
+ }
+
+ // case2: initialized by multi threads
+ invokedList.clear();
+ CountDownLatch latch = new CountDownLatch(100);
+ for (int i = 0; i < 100; i++) {
+ new Thread(() -> {
+ try {
+ handler.write(Collections.emptyList());
+ } catch (Exception e) {
+ // ignore
+ } finally {
+ latch.countDown();
+ }
+ }).start();
+ }
+ latch.await();
+ assertEquals(100, invokedList.size());
+ assertEquals(5, initializedList.size());
+ assertEquals(5, handler.getInitializedHandlerCnt());
+ }
+
@Test
public void writeSameFileWhenNoRaceCondition() throws Exception {
int concurrency = 5;