This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 612b1e477 [CELEBORN-1693] Fix storageFetcherPool concurrent problem
612b1e477 is described below
commit 612b1e477baa095829a02f04b701fcea33e21cd9
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Nov 7 13:51:39 2024 +0800
[CELEBORN-1693] Fix storageFetcherPool concurrent problem
---
.../celeborn/service/deploy/worker/storage/CreditStreamManager.java | 3 ++-
.../celeborn/service/deploy/worker/storage/MapPartitionData.java | 3 +--
.../service/deploy/worker/storage/CreditStreamManagerSuiteJ.java | 1 +
3 files changed, 4 insertions(+), 3 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
index de6a3620b..afe09a567 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
@@ -43,7 +43,8 @@ public class CreditStreamManager {
private final AtomicLong nextStreamId;
private final ConcurrentHashMap<Long, StreamState> streams;
private final ConcurrentHashMap<FileInfo, MapPartitionData>
activeMapPartitions;
- private final HashMap<String, ExecutorService> storageFetcherPool = new
HashMap<>();
+ private final ConcurrentHashMap<String, ExecutorService> storageFetcherPool =
+ JavaUtils.newConcurrentHashMap();
private int minReadBuffers;
private int maxReadBuffers;
private int threadsPerMountPoint;
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
index e0c60c0b9..961dd34af 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
@@ -19,7 +19,6 @@ package org.apache.celeborn.service.deploy.worker.storage;
import java.io.IOException;
import java.nio.channels.FileChannel;
-import java.util.HashMap;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -68,7 +67,7 @@ class MapPartitionData implements
MemoryManager.ReadBufferTargetChangeListener {
public MapPartitionData(
int minReadBuffers,
int maxReadBuffers,
- HashMap<String, ExecutorService> storageFetcherPool,
+ ConcurrentHashMap<String, ExecutorService> storageFetcherPool,
int threadsPerMountPoint,
DiskFileInfo diskFileInfo,
Consumer<Long> recycleStream,
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
index 95064b90e..d886cc864 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java
@@ -79,6 +79,7 @@ public class CreditStreamManagerSuiteJ {
new DiskFileInfo(
createTemporaryFileWithIndexFile(), new UserIdentifier("default",
"default"), conf);
MapFileMeta mapFileMeta = new MapFileMeta(1024, 10);
+ mapFileMeta.setMountPoint("/tmp");
diskFileInfo.replaceFileMeta(mapFileMeta);
Consumer<Long> streamIdConsumer = streamId -> Assert.assertTrue(streamId >
0);