This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 612b1e477 [CELEBORN-1693] Fix storageFetcherPool concurrent problem
612b1e477 is described below

commit 612b1e477baa095829a02f04b701fcea33e21cd9
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Nov 7 13:51:39 2024 +0800

    [CELEBORN-1693] Fix storageFetcherPool concurrent problem
---
 .../celeborn/service/deploy/worker/storage/CreditStreamManager.java    | 3 ++-
 .../celeborn/service/deploy/worker/storage/MapPartitionData.java       | 3 +--
 .../service/deploy/worker/storage/CreditStreamManagerSuiteJ.java       | 1 +
 3 files changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
index de6a3620b..afe09a567 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
@@ -43,7 +43,8 @@ public class CreditStreamManager {
   private final AtomicLong nextStreamId;
   private final ConcurrentHashMap<Long, StreamState> streams;
   private final ConcurrentHashMap<FileInfo, MapPartitionData> 
activeMapPartitions;
-  private final HashMap<String, ExecutorService> storageFetcherPool = new 
HashMap<>();
+  private final ConcurrentHashMap<String, ExecutorService> storageFetcherPool =
+      JavaUtils.newConcurrentHashMap();
   private int minReadBuffers;
   private int maxReadBuffers;
   private int threadsPerMountPoint;
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
index e0c60c0b9..961dd34af 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
@@ -19,7 +19,6 @@ package org.apache.celeborn.service.deploy.worker.storage;
 
 import java.io.IOException;
 import java.nio.channels.FileChannel;
-import java.util.HashMap;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -68,7 +67,7 @@ class MapPartitionData implements 
MemoryManager.ReadBufferTargetChangeListener {
   public MapPartitionData(
       int minReadBuffers,
       int maxReadBuffers,
-      HashMap<String, ExecutorService> storageFetcherPool,
+      ConcurrentHashMap<String, ExecutorService> storageFetcherPool,
       int threadsPerMountPoint,
       DiskFileInfo diskFileInfo,
       Consumer<Long> recycleStream,
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
index 95064b90e..d886cc864 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
@@ -79,6 +79,7 @@ public class CreditStreamManagerSuiteJ {
         new DiskFileInfo(
             createTemporaryFileWithIndexFile(), new UserIdentifier("default", 
"default"), conf);
     MapFileMeta mapFileMeta = new MapFileMeta(1024, 10);
+    mapFileMeta.setMountPoint("/tmp");
     diskFileInfo.replaceFileMeta(mapFileMeta);
     Consumer<Long> streamIdConsumer = streamId -> Assert.assertTrue(streamId > 
0);
 

Reply via email to