This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e7df13af [CELEBORN-1693] Fix storageFetcherPool concurrent problem
4e7df13af is described below

commit 4e7df13af7db018a86fced467623062bfca8d1f2
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Nov 7 13:51:39 2024 +0800

    [CELEBORN-1693] Fix storageFetcherPool concurrent problem
    
    ### What changes were proposed in this pull request?
    
    Fix storageFetcherPool concurrent problem.
    
    There may be duplicate thread pools created as multi-thread race condition.
    
    
![image](https://github.com/user-attachments/assets/ba4b0964-700e-4502-933a-b6c7cb93f32d)
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    No need.
    
    Closes #2886 from reswqa/storageFetcherPool.
    
    Authored-by: Weijie Guo <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../celeborn/service/deploy/worker/storage/CreditStreamManager.java   | 3 ++-
 .../celeborn/service/deploy/worker/storage/MapPartitionData.java      | 3 +--
 .../deploy/worker/storage/segment/SegmentMapPartitionData.java        | 4 ++--
 .../service/deploy/worker/storage/CreditStreamManagerSuiteJ.java      | 1 +
 4 files changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
index 526998208..0a3736a38 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
@@ -44,7 +44,8 @@ public class CreditStreamManager {
   private final AtomicLong nextStreamId;
   private final ConcurrentHashMap<Long, StreamState> streams;
   private final ConcurrentHashMap<FileInfo, MapPartitionData> 
activeMapPartitions;
-  private final HashMap<String, ExecutorService> storageFetcherPool = new 
HashMap<>();
+  private final ConcurrentHashMap<String, ExecutorService> storageFetcherPool =
+      JavaUtils.newConcurrentHashMap();
   private int minReadBuffers;
   private int maxReadBuffers;
   private int threadsPerMountPoint;
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
index 10cd0332b..f94c17c95 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
@@ -19,7 +19,6 @@ package org.apache.celeborn.service.deploy.worker.storage;
 
 import java.io.IOException;
 import java.nio.channels.FileChannel;
-import java.util.HashMap;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -68,7 +67,7 @@ public class MapPartitionData implements 
MemoryManager.ReadBufferTargetChangeLis
   public MapPartitionData(
       int minReadBuffers,
       int maxReadBuffers,
-      HashMap<String, ExecutorService> storageFetcherPool,
+      ConcurrentHashMap<String, ExecutorService> storageFetcherPool,
       int threadsPerMountPoint,
       DiskFileInfo diskFileInfo,
       Consumer<Long> recycleStream,
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java
index 3d3b6cfe8..aa3090a3d 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java
@@ -19,7 +19,7 @@
 package org.apache.celeborn.service.deploy.worker.storage.segment;
 
 import java.io.IOException;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
 
@@ -38,7 +38,7 @@ public class SegmentMapPartitionData extends MapPartitionData 
{
   public SegmentMapPartitionData(
       int minReadBuffers,
       int maxReadBuffers,
-      HashMap<String, ExecutorService> storageFetcherPool,
+      ConcurrentHashMap<String, ExecutorService> storageFetcherPool,
       int threadsPerMountPoint,
       DiskFileInfo fileInfo,
       Consumer<Long> recycleStream,
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
index 95064b90e..d886cc864 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
@@ -79,6 +79,7 @@ public class CreditStreamManagerSuiteJ {
         new DiskFileInfo(
             createTemporaryFileWithIndexFile(), new UserIdentifier("default", 
"default"), conf);
     MapFileMeta mapFileMeta = new MapFileMeta(1024, 10);
+    mapFileMeta.setMountPoint("/tmp");
     diskFileInfo.replaceFileMeta(mapFileMeta);
     Consumer<Long> streamIdConsumer = streamId -> Assert.assertTrue(streamId > 
0);
 

Reply via email to