This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 4e7df13af [CELEBORN-1693] Fix storageFetcherPool concurrent problem
4e7df13af is described below
commit 4e7df13af7db018a86fced467623062bfca8d1f2
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Nov 7 13:51:39 2024 +0800
[CELEBORN-1693] Fix storageFetcherPool concurrent problem
### What changes were proposed in this pull request?
Fix storageFetcherPool concurrent problem.
There may be duplicate thread pools created as multi-thread race condition.

### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No need.
Closes #2886 from reswqa/storageFetcherPool.
Authored-by: Weijie Guo <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../celeborn/service/deploy/worker/storage/CreditStreamManager.java | 3 ++-
.../celeborn/service/deploy/worker/storage/MapPartitionData.java | 3 +--
.../deploy/worker/storage/segment/SegmentMapPartitionData.java | 4 ++--
.../service/deploy/worker/storage/CreditStreamManagerSuiteJ.java | 1 +
4 files changed, 6 insertions(+), 5 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
index 526998208..0a3736a38 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
@@ -44,7 +44,8 @@ public class CreditStreamManager {
private final AtomicLong nextStreamId;
private final ConcurrentHashMap<Long, StreamState> streams;
private final ConcurrentHashMap<FileInfo, MapPartitionData>
activeMapPartitions;
- private final HashMap<String, ExecutorService> storageFetcherPool = new
HashMap<>();
+ private final ConcurrentHashMap<String, ExecutorService> storageFetcherPool =
+ JavaUtils.newConcurrentHashMap();
private int minReadBuffers;
private int maxReadBuffers;
private int threadsPerMountPoint;
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
index 10cd0332b..f94c17c95 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
@@ -19,7 +19,6 @@ package org.apache.celeborn.service.deploy.worker.storage;
import java.io.IOException;
import java.nio.channels.FileChannel;
-import java.util.HashMap;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -68,7 +67,7 @@ public class MapPartitionData implements
MemoryManager.ReadBufferTargetChangeLis
public MapPartitionData(
int minReadBuffers,
int maxReadBuffers,
- HashMap<String, ExecutorService> storageFetcherPool,
+ ConcurrentHashMap<String, ExecutorService> storageFetcherPool,
int threadsPerMountPoint,
DiskFileInfo diskFileInfo,
Consumer<Long> recycleStream,
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java
index 3d3b6cfe8..aa3090a3d 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java
@@ -19,7 +19,7 @@
package org.apache.celeborn.service.deploy.worker.storage.segment;
import java.io.IOException;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
@@ -38,7 +38,7 @@ public class SegmentMapPartitionData extends MapPartitionData
{
public SegmentMapPartitionData(
int minReadBuffers,
int maxReadBuffers,
- HashMap<String, ExecutorService> storageFetcherPool,
+ ConcurrentHashMap<String, ExecutorService> storageFetcherPool,
int threadsPerMountPoint,
DiskFileInfo fileInfo,
Consumer<Long> recycleStream,
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
index 95064b90e..d886cc864 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
@@ -79,6 +79,7 @@ public class CreditStreamManagerSuiteJ {
new DiskFileInfo(
createTemporaryFileWithIndexFile(), new UserIdentifier("default",
"default"), conf);
MapFileMeta mapFileMeta = new MapFileMeta(1024, 10);
+ mapFileMeta.setMountPoint("/tmp");
diskFileInfo.replaceFileMeta(mapFileMeta);
Consumer<Long> streamIdConsumer = streamId -> Assert.assertTrue(streamId >
0);