This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new b3bf68edc [CELEBORN-474][FOLLOWUP] Use JavaUtils#newConcurrentHashMap
to speed up ConcurrentHashMap#computeIfAbsent
b3bf68edc is described below
commit b3bf68edc797d1712208219dd31cc98939479984
Author: SteNicholas <[email protected]>
AuthorDate: Wed Oct 9 16:49:27 2024 +0800
[CELEBORN-474][FOLLOWUP] Use JavaUtils#newConcurrentHashMap to speed up
ConcurrentHashMap#computeIfAbsent
### What changes were proposed in this pull request?
Use `JavaUtils#newConcurrentHashMap` to speed up
`ConcurrentHashMap#computeIfAbsent`.
Follow up #1383.
### Why are the changes needed?
Celeborn supports JDK8, which could meet the bug mentioned in
[JDK-8161372](https://bugs.openjdk.org/browse/JDK-8161372). Therefore, it's
better to use `JavaUtils#newConcurrentHashMap` to speed up
`ConcurrentHashMap#computeIfAbsent`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #2796 from SteNicholas/CELEBORN-474.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../apache/celeborn/plugin/flink/ShuffleResourceTracker.java | 8 ++++----
.../celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java | 10 +++++++---
.../celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java | 10 +++++++---
.../celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java | 10 +++++++---
.../celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java | 10 +++++++---
.../celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java | 10 +++++++---
.../celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java | 10 +++++++---
.../celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java | 10 +++++++---
.../spark/shuffle/celeborn/CelebornShuffleReader.scala | 6 +++---
.../spark/shuffle/celeborn/CelebornShuffleReader.scala | 2 +-
.../apache/celeborn/client/RequestLocationCallContext.scala | 5 +++--
.../apache/celeborn/client/write/DataPushQueueSuiteJ.java | 3 +--
.../celeborn/common/network/sasl/SecretRegistryImpl.java | 4 +++-
.../main/java/org/apache/celeborn/common/util/JavaUtils.java | 12 ++++++++++++
.../apache/celeborn/common/ComputeIfAbsentBenchmark.scala | 4 ++--
.../celeborn/service/deploy/master/tags/TagsManager.scala | 3 ++-
16 files changed, 80 insertions(+), 37 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceTracker.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceTracker.java
index c024da246..357b7d84a 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceTracker.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceTracker.java
@@ -19,7 +19,6 @@ package org.apache.celeborn.plugin.flink;
import java.util.*;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.apache.flink.api.common.JobID;
@@ -33,6 +32,7 @@ import
org.apache.celeborn.client.listener.WorkerStatusListener;
import org.apache.celeborn.client.listener.WorkersStatus;
import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
+import org.apache.celeborn.common.util.JavaUtils;
public class ShuffleResourceTracker implements WorkerStatusListener {
private static final Logger LOG =
LoggerFactory.getLogger(ShuffleResourceTracker.class);
@@ -40,7 +40,7 @@ public class ShuffleResourceTracker implements
WorkerStatusListener {
private final LifecycleManager lifecycleManager;
// JobID -> ShuffleResourceListener
private final Map<JobID, JobShuffleResourceListener>
shuffleResourceListeners =
- new ConcurrentHashMap<>();
+ JavaUtils.newConcurrentHashMap();
private static final int MAX_RETRY_TIMES = 3;
public ShuffleResourceTracker(
@@ -132,7 +132,7 @@ public class ShuffleResourceTracker implements
WorkerStatusListener {
private final ExecutorService executorService;
// celeborn shuffleId -> partitionId -> Flink ResultPartitionID
private Map<Integer, Map<Integer, ResultPartitionID>> resultPartitionMap =
- new ConcurrentHashMap<>();
+ JavaUtils.newConcurrentHashMap();
public JobShuffleResourceListener(
JobShuffleContext jobShuffleContext, ExecutorService executorService) {
@@ -143,7 +143,7 @@ public class ShuffleResourceTracker implements
WorkerStatusListener {
public void addPartitionResource(
int shuffleId, int partitionId, ResultPartitionID partitionID) {
Map<Integer, ResultPartitionID> shufflePartitionMap =
- resultPartitionMap.computeIfAbsent(shuffleId, (s) -> new
ConcurrentHashMap<>());
+ resultPartitionMap.computeIfAbsent(shuffleId, (s) ->
JavaUtils.newConcurrentHashMap());
shufflePartitionMap.put(partitionId, partitionID);
}
diff --git
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
index d77534af9..58e1a0a97 100644
---
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
+++
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
@@ -37,6 +37,7 @@ import org.apache.celeborn.client.listener.WorkersStatus;
import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.JavaUtils;
public class ShuffleResourceTrackerSuiteJ {
@@ -45,14 +46,17 @@ public class ShuffleResourceTrackerSuiteJ {
LifecycleManager lifecycleManager = Mockito.mock(LifecycleManager.class);
ScheduledThreadPoolExecutor executor =
Mockito.mock(ScheduledThreadPoolExecutor.class);
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map =
+ JavaUtils.newConcurrentHashMap();
WorkerInfo workerInfo = new WorkerInfo("mock", -1, -1, -1, -1);
map.put(workerInfo, mockShufflePartitionLocationInfo());
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 =
+ JavaUtils.newConcurrentHashMap();
map2.put(workerInfo, mockShufflePartitionLocationInfo());
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 =
+ JavaUtils.newConcurrentHashMap();
map3.put(workerInfo, mockShufflePartitionLocationInfo());
Mockito.when(lifecycleManager.workerSnapshots(Mockito.anyInt())).thenReturn(map,
map2, map3);
diff --git
a/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
b/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
index d77534af9..58e1a0a97 100644
---
a/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
+++
b/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
@@ -37,6 +37,7 @@ import org.apache.celeborn.client.listener.WorkersStatus;
import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.JavaUtils;
public class ShuffleResourceTrackerSuiteJ {
@@ -45,14 +46,17 @@ public class ShuffleResourceTrackerSuiteJ {
LifecycleManager lifecycleManager = Mockito.mock(LifecycleManager.class);
ScheduledThreadPoolExecutor executor =
Mockito.mock(ScheduledThreadPoolExecutor.class);
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map =
+ JavaUtils.newConcurrentHashMap();
WorkerInfo workerInfo = new WorkerInfo("mock", -1, -1, -1, -1);
map.put(workerInfo, mockShufflePartitionLocationInfo());
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 =
+ JavaUtils.newConcurrentHashMap();
map2.put(workerInfo, mockShufflePartitionLocationInfo());
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 =
+ JavaUtils.newConcurrentHashMap();
map3.put(workerInfo, mockShufflePartitionLocationInfo());
Mockito.when(lifecycleManager.workerSnapshots(Mockito.anyInt())).thenReturn(map,
map2, map3);
diff --git
a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
index d77534af9..58e1a0a97 100644
---
a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
+++
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
@@ -37,6 +37,7 @@ import org.apache.celeborn.client.listener.WorkersStatus;
import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.JavaUtils;
public class ShuffleResourceTrackerSuiteJ {
@@ -45,14 +46,17 @@ public class ShuffleResourceTrackerSuiteJ {
LifecycleManager lifecycleManager = Mockito.mock(LifecycleManager.class);
ScheduledThreadPoolExecutor executor =
Mockito.mock(ScheduledThreadPoolExecutor.class);
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map =
+ JavaUtils.newConcurrentHashMap();
WorkerInfo workerInfo = new WorkerInfo("mock", -1, -1, -1, -1);
map.put(workerInfo, mockShufflePartitionLocationInfo());
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 =
+ JavaUtils.newConcurrentHashMap();
map2.put(workerInfo, mockShufflePartitionLocationInfo());
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 =
+ JavaUtils.newConcurrentHashMap();
map3.put(workerInfo, mockShufflePartitionLocationInfo());
Mockito.when(lifecycleManager.workerSnapshots(Mockito.anyInt())).thenReturn(map,
map2, map3);
diff --git
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
index d77534af9..58e1a0a97 100644
---
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
+++
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
@@ -37,6 +37,7 @@ import org.apache.celeborn.client.listener.WorkersStatus;
import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.JavaUtils;
public class ShuffleResourceTrackerSuiteJ {
@@ -45,14 +46,17 @@ public class ShuffleResourceTrackerSuiteJ {
LifecycleManager lifecycleManager = Mockito.mock(LifecycleManager.class);
ScheduledThreadPoolExecutor executor =
Mockito.mock(ScheduledThreadPoolExecutor.class);
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map =
+ JavaUtils.newConcurrentHashMap();
WorkerInfo workerInfo = new WorkerInfo("mock", -1, -1, -1, -1);
map.put(workerInfo, mockShufflePartitionLocationInfo());
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 =
+ JavaUtils.newConcurrentHashMap();
map2.put(workerInfo, mockShufflePartitionLocationInfo());
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 =
+ JavaUtils.newConcurrentHashMap();
map3.put(workerInfo, mockShufflePartitionLocationInfo());
Mockito.when(lifecycleManager.workerSnapshots(Mockito.anyInt())).thenReturn(map,
map2, map3);
diff --git
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
index d77534af9..58e1a0a97 100644
---
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
+++
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
@@ -37,6 +37,7 @@ import org.apache.celeborn.client.listener.WorkersStatus;
import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.JavaUtils;
public class ShuffleResourceTrackerSuiteJ {
@@ -45,14 +46,17 @@ public class ShuffleResourceTrackerSuiteJ {
LifecycleManager lifecycleManager = Mockito.mock(LifecycleManager.class);
ScheduledThreadPoolExecutor executor =
Mockito.mock(ScheduledThreadPoolExecutor.class);
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map =
+ JavaUtils.newConcurrentHashMap();
WorkerInfo workerInfo = new WorkerInfo("mock", -1, -1, -1, -1);
map.put(workerInfo, mockShufflePartitionLocationInfo());
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 =
+ JavaUtils.newConcurrentHashMap();
map2.put(workerInfo, mockShufflePartitionLocationInfo());
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 =
+ JavaUtils.newConcurrentHashMap();
map3.put(workerInfo, mockShufflePartitionLocationInfo());
Mockito.when(lifecycleManager.workerSnapshots(Mockito.anyInt())).thenReturn(map,
map2, map3);
diff --git
a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
index d77534af9..58e1a0a97 100644
---
a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
+++
b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
@@ -37,6 +37,7 @@ import org.apache.celeborn.client.listener.WorkersStatus;
import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.JavaUtils;
public class ShuffleResourceTrackerSuiteJ {
@@ -45,14 +46,17 @@ public class ShuffleResourceTrackerSuiteJ {
LifecycleManager lifecycleManager = Mockito.mock(LifecycleManager.class);
ScheduledThreadPoolExecutor executor =
Mockito.mock(ScheduledThreadPoolExecutor.class);
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map =
+ JavaUtils.newConcurrentHashMap();
WorkerInfo workerInfo = new WorkerInfo("mock", -1, -1, -1, -1);
map.put(workerInfo, mockShufflePartitionLocationInfo());
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 =
+ JavaUtils.newConcurrentHashMap();
map2.put(workerInfo, mockShufflePartitionLocationInfo());
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 =
+ JavaUtils.newConcurrentHashMap();
map3.put(workerInfo, mockShufflePartitionLocationInfo());
Mockito.when(lifecycleManager.workerSnapshots(Mockito.anyInt())).thenReturn(map,
map2, map3);
diff --git
a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
index 85094d0cd..72334bb8b 100644
---
a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
+++
b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
@@ -40,6 +40,7 @@ import org.apache.celeborn.client.listener.WorkersStatus;
import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.JavaUtils;
public class ShuffleResourceTrackerSuiteJ {
@@ -48,14 +49,17 @@ public class ShuffleResourceTrackerSuiteJ {
LifecycleManager lifecycleManager = Mockito.mock(LifecycleManager.class);
ScheduledThreadPoolExecutor executor =
Mockito.mock(ScheduledThreadPoolExecutor.class);
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map =
+ JavaUtils.newConcurrentHashMap();
WorkerInfo workerInfo = new WorkerInfo("mock", -1, -1, -1, -1);
map.put(workerInfo, mockShufflePartitionLocationInfo());
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 =
+ JavaUtils.newConcurrentHashMap();
map2.put(workerInfo, mockShufflePartitionLocationInfo());
- ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 = new
ConcurrentHashMap<>();
+ ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 =
+ JavaUtils.newConcurrentHashMap();
map3.put(workerInfo, mockShufflePartitionLocationInfo());
Mockito.when(lifecycleManager.workerSnapshots(Mockito.anyInt())).thenReturn(map,
map2, map3);
diff --git
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 4a3092275..fb55e7418 100644
---
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -18,7 +18,7 @@
package org.apache.spark.shuffle.celeborn
import java.io.IOException
-import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor, TimeUnit}
+import java.util.concurrent.{ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
import org.apache.spark.{Aggregator, InterruptibleIterator, TaskContext}
@@ -33,7 +33,7 @@ import org.apache.celeborn.client.read.CelebornInputStream
import org.apache.celeborn.client.read.MetricsCallback
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.exception.{CelebornIOException,
PartitionUnRetryAbleException}
-import org.apache.celeborn.common.util.ThreadUtils
+import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils}
class CelebornShuffleReader[K, C](
handle: CelebornShuffleHandle[K, _, C],
@@ -88,7 +88,7 @@ class CelebornShuffleReader[K, C](
}
}
- val streams = new ConcurrentHashMap[Integer, CelebornInputStream]()
+ val streams = JavaUtils.newConcurrentHashMap[Integer,
CelebornInputStream]()
(startPartition until endPartition).map(partitionId => {
streamCreatorPool.submit(new Runnable {
override def run(): Unit = {
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 7cfa0a324..1b7b6f1dd 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -173,7 +173,7 @@ class CelebornShuffleReader[K, C](
val end = System.currentTimeMillis()
logInfo(s"BatchOpenStream for $partCnt cost ${end - startTime}ms")
- val streams = new ConcurrentHashMap[Integer, CelebornInputStream]()
+ val streams = JavaUtils.newConcurrentHashMap[Integer,
CelebornInputStream]()
def createInputStream(partitionId: Int): Unit = {
val locations =
diff --git
a/client/src/main/scala/org/apache/celeborn/client/RequestLocationCallContext.scala
b/client/src/main/scala/org/apache/celeborn/client/RequestLocationCallContext.scala
index d5e6c4c36..d306546b3 100644
---
a/client/src/main/scala/org/apache/celeborn/client/RequestLocationCallContext.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/RequestLocationCallContext.scala
@@ -18,13 +18,13 @@
package org.apache.celeborn.client
import java.util
-import java.util.concurrent.ConcurrentHashMap
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.protocol.PartitionLocation
import
org.apache.celeborn.common.protocol.message.ControlMessages.{ChangeLocationResponse,
RegisterShuffleResponse}
import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.common.rpc.RpcCallContext
+import org.apache.celeborn.common.util.JavaUtils
trait RequestLocationCallContext {
def reply(
@@ -40,7 +40,8 @@ case class ChangeLocationsCallContext(
extends RequestLocationCallContext with Logging {
val endedMapIds = new util.HashSet[Integer]()
val newLocs =
- new ConcurrentHashMap[Integer, (StatusCode, Boolean,
PartitionLocation)](partitionCount)
+ JavaUtils.newConcurrentHashMap[Integer, (StatusCode, Boolean,
PartitionLocation)](
+ partitionCount)
def markMapperEnd(mapId: Int): Unit = this.synchronized {
endedMapIds.add(mapId)
diff --git
a/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
b/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
index 284ba3148..99a27eb3d 100644
---
a/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
+++
b/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
import org.junit.AfterClass;
@@ -76,7 +75,7 @@ public class DataPushQueueSuiteJ {
tarWorkerData.add(new ArrayList<>());
}
- Map<Integer, Integer> partitionBatchIdMap = new ConcurrentHashMap<>();
+ Map<Integer, Integer> partitionBatchIdMap =
JavaUtils.newConcurrentHashMap();
CelebornConf conf = new CelebornConf();
conf.set(CelebornConf.CLIENT_PUSH_MAX_REQS_IN_FLIGHT_PERWORKER().key(),
"2");
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java
b/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java
index d647ade5c..66ef48376 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java
@@ -19,10 +19,12 @@ package org.apache.celeborn.common.network.sasl;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.celeborn.common.util.JavaUtils;
+
/** A simple implementation of {@link SecretRegistry} that stores secrets in
memory. */
public class SecretRegistryImpl implements SecretRegistry {
- private final ConcurrentHashMap<String, String> secrets = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, String> secrets =
JavaUtils.newConcurrentHashMap();
@Override
public void register(String appId, String secret) {
diff --git
a/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
b/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
index 258a3fd40..9f1d106ce 100644
--- a/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
+++ b/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
@@ -460,6 +460,14 @@ public class JavaUtils {
}
}
+ public static <K, V> ConcurrentHashMap<K, V> newConcurrentHashMap(int
initialCapacity) {
+ if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
+ return new ConcurrentHashMap<>(initialCapacity);
+ } else {
+ return new ConcurrentHashMapForJDK8<>(initialCapacity);
+ }
+ }
+
/**
* For JDK8, there is bug for ConcurrentHashMap#computeIfAbsent, checking
the key existence to
* speed up. See details in CELEBORN-474.
@@ -469,6 +477,10 @@ public class JavaUtils {
super();
}
+ public ConcurrentHashMapForJDK8(int initialCapacity) {
+ super(initialCapacity);
+ }
+
public ConcurrentHashMapForJDK8(Map<? extends K, ? extends V> m) {
super(m);
}
diff --git
a/common/src/test/scala/org/apache/celeborn/common/ComputeIfAbsentBenchmark.scala
b/common/src/test/scala/org/apache/celeborn/common/ComputeIfAbsentBenchmark.scala
index eb347e64f..f80eea6fd 100644
---
a/common/src/test/scala/org/apache/celeborn/common/ComputeIfAbsentBenchmark.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/ComputeIfAbsentBenchmark.scala
@@ -18,13 +18,13 @@
package org.apache.celeborn.common
import java.util.{HashMap => JHashMap, Map => JMap}
-import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.{Function => JFunction}
import scala.util.Random
import org.apache.celeborn.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.celeborn.common.util.JavaUtils
/**
* ComputeIfAbsent benchmark.
@@ -66,6 +66,6 @@ object ComputeIfAbsentBenchmark extends BenchmarkBase {
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
test("HashMap", new JHashMap[Int, AtomicInteger], 1 << 26)
- test("ConcurrentHashMap", new ConcurrentHashMap[Int, AtomicInteger], 1 <<
26)
+ test("ConcurrentHashMap", JavaUtils.newConcurrentHashMap[Int,
AtomicInteger], 1 << 26)
}
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
index 6dd6e43e2..52c3f48fa 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
@@ -25,9 +25,10 @@ import
scala.collection.JavaConverters.{asScalaIteratorConverter, mapAsScalaConc
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.WorkerInfo
+import org.apache.celeborn.common.util.JavaUtils
class TagsManager extends Logging {
- private val tagStore = new ConcurrentHashMap[String, JSet[String]]()
+ private val tagStore = JavaUtils.newConcurrentHashMap[String, JSet[String]]()
private val addNewTagFunc =
new util.function.Function[String, ConcurrentHashMap.KeySetView[String,
java.lang.Boolean]]() {