This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new b3bf68edc [CELEBORN-474][FOLLOWUP] Use JavaUtils#newConcurrentHashMap 
to speed up ConcurrentHashMap#computeIfAbsent
b3bf68edc is described below

commit b3bf68edc797d1712208219dd31cc98939479984
Author: SteNicholas <[email protected]>
AuthorDate: Wed Oct 9 16:49:27 2024 +0800

    [CELEBORN-474][FOLLOWUP] Use JavaUtils#newConcurrentHashMap to speed up 
ConcurrentHashMap#computeIfAbsent
    
    ### What changes were proposed in this pull request?
    
    Use `JavaUtils#newConcurrentHashMap` to speed up 
`ConcurrentHashMap#computeIfAbsent`.
    
    Follow up #1383.
    
    ### Why are the changes needed?
    
    Celeborn supports JDK8, which could meet the bug mentioned in 
[JDK-8161372](https://bugs.openjdk.org/browse/JDK-8161372). Therefore, it's 
better to use `JavaUtils#newConcurrentHashMap` to speed up 
`ConcurrentHashMap#computeIfAbsent`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
    
    Closes #2796 from SteNicholas/CELEBORN-474.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../apache/celeborn/plugin/flink/ShuffleResourceTracker.java |  8 ++++----
 .../celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java  | 10 +++++++---
 .../celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java  | 10 +++++++---
 .../celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java  | 10 +++++++---
 .../celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java  | 10 +++++++---
 .../celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java  | 10 +++++++---
 .../celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java  | 10 +++++++---
 .../celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java  | 10 +++++++---
 .../spark/shuffle/celeborn/CelebornShuffleReader.scala       |  6 +++---
 .../spark/shuffle/celeborn/CelebornShuffleReader.scala       |  2 +-
 .../apache/celeborn/client/RequestLocationCallContext.scala  |  5 +++--
 .../apache/celeborn/client/write/DataPushQueueSuiteJ.java    |  3 +--
 .../celeborn/common/network/sasl/SecretRegistryImpl.java     |  4 +++-
 .../main/java/org/apache/celeborn/common/util/JavaUtils.java | 12 ++++++++++++
 .../apache/celeborn/common/ComputeIfAbsentBenchmark.scala    |  4 ++--
 .../celeborn/service/deploy/master/tags/TagsManager.scala    |  3 ++-
 16 files changed, 80 insertions(+), 37 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceTracker.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceTracker.java
index c024da246..357b7d84a 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceTracker.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/ShuffleResourceTracker.java
@@ -19,7 +19,6 @@ package org.apache.celeborn.plugin.flink;
 
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.flink.api.common.JobID;
@@ -33,6 +32,7 @@ import 
org.apache.celeborn.client.listener.WorkerStatusListener;
 import org.apache.celeborn.client.listener.WorkersStatus;
 import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
+import org.apache.celeborn.common.util.JavaUtils;
 
 public class ShuffleResourceTracker implements WorkerStatusListener {
   private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleResourceTracker.class);
@@ -40,7 +40,7 @@ public class ShuffleResourceTracker implements 
WorkerStatusListener {
   private final LifecycleManager lifecycleManager;
   // JobID -> ShuffleResourceListener
   private final Map<JobID, JobShuffleResourceListener> 
shuffleResourceListeners =
-      new ConcurrentHashMap<>();
+      JavaUtils.newConcurrentHashMap();
   private static final int MAX_RETRY_TIMES = 3;
 
   public ShuffleResourceTracker(
@@ -132,7 +132,7 @@ public class ShuffleResourceTracker implements 
WorkerStatusListener {
     private final ExecutorService executorService;
     // celeborn shuffleId -> partitionId -> Flink ResultPartitionID
     private Map<Integer, Map<Integer, ResultPartitionID>> resultPartitionMap =
-        new ConcurrentHashMap<>();
+        JavaUtils.newConcurrentHashMap();
 
     public JobShuffleResourceListener(
         JobShuffleContext jobShuffleContext, ExecutorService executorService) {
@@ -143,7 +143,7 @@ public class ShuffleResourceTracker implements 
WorkerStatusListener {
     public void addPartitionResource(
         int shuffleId, int partitionId, ResultPartitionID partitionID) {
       Map<Integer, ResultPartitionID> shufflePartitionMap =
-          resultPartitionMap.computeIfAbsent(shuffleId, (s) -> new 
ConcurrentHashMap<>());
+          resultPartitionMap.computeIfAbsent(shuffleId, (s) -> 
JavaUtils.newConcurrentHashMap());
       shufflePartitionMap.put(partitionId, partitionID);
     }
 
diff --git 
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
 
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
index d77534af9..58e1a0a97 100644
--- 
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
+++ 
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
@@ -37,6 +37,7 @@ import org.apache.celeborn.client.listener.WorkersStatus;
 import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.JavaUtils;
 
 public class ShuffleResourceTrackerSuiteJ {
 
@@ -45,14 +46,17 @@ public class ShuffleResourceTrackerSuiteJ {
     LifecycleManager lifecycleManager = Mockito.mock(LifecycleManager.class);
     ScheduledThreadPoolExecutor executor = 
Mockito.mock(ScheduledThreadPoolExecutor.class);
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map =
+        JavaUtils.newConcurrentHashMap();
     WorkerInfo workerInfo = new WorkerInfo("mock", -1, -1, -1, -1);
     map.put(workerInfo, mockShufflePartitionLocationInfo());
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 =
+        JavaUtils.newConcurrentHashMap();
     map2.put(workerInfo, mockShufflePartitionLocationInfo());
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 =
+        JavaUtils.newConcurrentHashMap();
     map3.put(workerInfo, mockShufflePartitionLocationInfo());
 
     
Mockito.when(lifecycleManager.workerSnapshots(Mockito.anyInt())).thenReturn(map,
 map2, map3);
diff --git 
a/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
 
b/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
index d77534af9..58e1a0a97 100644
--- 
a/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
+++ 
b/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
@@ -37,6 +37,7 @@ import org.apache.celeborn.client.listener.WorkersStatus;
 import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.JavaUtils;
 
 public class ShuffleResourceTrackerSuiteJ {
 
@@ -45,14 +46,17 @@ public class ShuffleResourceTrackerSuiteJ {
     LifecycleManager lifecycleManager = Mockito.mock(LifecycleManager.class);
     ScheduledThreadPoolExecutor executor = 
Mockito.mock(ScheduledThreadPoolExecutor.class);
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map =
+        JavaUtils.newConcurrentHashMap();
     WorkerInfo workerInfo = new WorkerInfo("mock", -1, -1, -1, -1);
     map.put(workerInfo, mockShufflePartitionLocationInfo());
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 =
+        JavaUtils.newConcurrentHashMap();
     map2.put(workerInfo, mockShufflePartitionLocationInfo());
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 =
+        JavaUtils.newConcurrentHashMap();
     map3.put(workerInfo, mockShufflePartitionLocationInfo());
 
     
Mockito.when(lifecycleManager.workerSnapshots(Mockito.anyInt())).thenReturn(map,
 map2, map3);
diff --git 
a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
 
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
index d77534af9..58e1a0a97 100644
--- 
a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
+++ 
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
@@ -37,6 +37,7 @@ import org.apache.celeborn.client.listener.WorkersStatus;
 import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.JavaUtils;
 
 public class ShuffleResourceTrackerSuiteJ {
 
@@ -45,14 +46,17 @@ public class ShuffleResourceTrackerSuiteJ {
     LifecycleManager lifecycleManager = Mockito.mock(LifecycleManager.class);
     ScheduledThreadPoolExecutor executor = 
Mockito.mock(ScheduledThreadPoolExecutor.class);
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map =
+        JavaUtils.newConcurrentHashMap();
     WorkerInfo workerInfo = new WorkerInfo("mock", -1, -1, -1, -1);
     map.put(workerInfo, mockShufflePartitionLocationInfo());
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 =
+        JavaUtils.newConcurrentHashMap();
     map2.put(workerInfo, mockShufflePartitionLocationInfo());
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 =
+        JavaUtils.newConcurrentHashMap();
     map3.put(workerInfo, mockShufflePartitionLocationInfo());
 
     
Mockito.when(lifecycleManager.workerSnapshots(Mockito.anyInt())).thenReturn(map,
 map2, map3);
diff --git 
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
 
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
index d77534af9..58e1a0a97 100644
--- 
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
+++ 
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
@@ -37,6 +37,7 @@ import org.apache.celeborn.client.listener.WorkersStatus;
 import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.JavaUtils;
 
 public class ShuffleResourceTrackerSuiteJ {
 
@@ -45,14 +46,17 @@ public class ShuffleResourceTrackerSuiteJ {
     LifecycleManager lifecycleManager = Mockito.mock(LifecycleManager.class);
     ScheduledThreadPoolExecutor executor = 
Mockito.mock(ScheduledThreadPoolExecutor.class);
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map =
+        JavaUtils.newConcurrentHashMap();
     WorkerInfo workerInfo = new WorkerInfo("mock", -1, -1, -1, -1);
     map.put(workerInfo, mockShufflePartitionLocationInfo());
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 =
+        JavaUtils.newConcurrentHashMap();
     map2.put(workerInfo, mockShufflePartitionLocationInfo());
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 =
+        JavaUtils.newConcurrentHashMap();
     map3.put(workerInfo, mockShufflePartitionLocationInfo());
 
     
Mockito.when(lifecycleManager.workerSnapshots(Mockito.anyInt())).thenReturn(map,
 map2, map3);
diff --git 
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
 
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
index d77534af9..58e1a0a97 100644
--- 
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
+++ 
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
@@ -37,6 +37,7 @@ import org.apache.celeborn.client.listener.WorkersStatus;
 import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.JavaUtils;
 
 public class ShuffleResourceTrackerSuiteJ {
 
@@ -45,14 +46,17 @@ public class ShuffleResourceTrackerSuiteJ {
     LifecycleManager lifecycleManager = Mockito.mock(LifecycleManager.class);
     ScheduledThreadPoolExecutor executor = 
Mockito.mock(ScheduledThreadPoolExecutor.class);
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map =
+        JavaUtils.newConcurrentHashMap();
     WorkerInfo workerInfo = new WorkerInfo("mock", -1, -1, -1, -1);
     map.put(workerInfo, mockShufflePartitionLocationInfo());
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 =
+        JavaUtils.newConcurrentHashMap();
     map2.put(workerInfo, mockShufflePartitionLocationInfo());
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 =
+        JavaUtils.newConcurrentHashMap();
     map3.put(workerInfo, mockShufflePartitionLocationInfo());
 
     
Mockito.when(lifecycleManager.workerSnapshots(Mockito.anyInt())).thenReturn(map,
 map2, map3);
diff --git 
a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
 
b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
index d77534af9..58e1a0a97 100644
--- 
a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
+++ 
b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
@@ -37,6 +37,7 @@ import org.apache.celeborn.client.listener.WorkersStatus;
 import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.JavaUtils;
 
 public class ShuffleResourceTrackerSuiteJ {
 
@@ -45,14 +46,17 @@ public class ShuffleResourceTrackerSuiteJ {
     LifecycleManager lifecycleManager = Mockito.mock(LifecycleManager.class);
     ScheduledThreadPoolExecutor executor = 
Mockito.mock(ScheduledThreadPoolExecutor.class);
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map =
+        JavaUtils.newConcurrentHashMap();
     WorkerInfo workerInfo = new WorkerInfo("mock", -1, -1, -1, -1);
     map.put(workerInfo, mockShufflePartitionLocationInfo());
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 =
+        JavaUtils.newConcurrentHashMap();
     map2.put(workerInfo, mockShufflePartitionLocationInfo());
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 =
+        JavaUtils.newConcurrentHashMap();
     map3.put(workerInfo, mockShufflePartitionLocationInfo());
 
     
Mockito.when(lifecycleManager.workerSnapshots(Mockito.anyInt())).thenReturn(map,
 map2, map3);
diff --git 
a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
 
b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
index 85094d0cd..72334bb8b 100644
--- 
a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
+++ 
b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/ShuffleResourceTrackerSuiteJ.java
@@ -40,6 +40,7 @@ import org.apache.celeborn.client.listener.WorkersStatus;
 import org.apache.celeborn.common.meta.ShufflePartitionLocationInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.util.JavaUtils;
 
 public class ShuffleResourceTrackerSuiteJ {
 
@@ -48,14 +49,17 @@ public class ShuffleResourceTrackerSuiteJ {
     LifecycleManager lifecycleManager = Mockito.mock(LifecycleManager.class);
     ScheduledThreadPoolExecutor executor = 
Mockito.mock(ScheduledThreadPoolExecutor.class);
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map =
+        JavaUtils.newConcurrentHashMap();
     WorkerInfo workerInfo = new WorkerInfo("mock", -1, -1, -1, -1);
     map.put(workerInfo, mockShufflePartitionLocationInfo());
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map2 =
+        JavaUtils.newConcurrentHashMap();
     map2.put(workerInfo, mockShufflePartitionLocationInfo());
 
-    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<WorkerInfo, ShufflePartitionLocationInfo> map3 =
+        JavaUtils.newConcurrentHashMap();
     map3.put(workerInfo, mockShufflePartitionLocationInfo());
 
     
Mockito.when(lifecycleManager.workerSnapshots(Mockito.anyInt())).thenReturn(map,
 map2, map3);
diff --git 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 4a3092275..fb55e7418 100644
--- 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.shuffle.celeborn
 
 import java.io.IOException
-import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor, TimeUnit}
+import java.util.concurrent.{ThreadPoolExecutor, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.spark.{Aggregator, InterruptibleIterator, TaskContext}
@@ -33,7 +33,7 @@ import org.apache.celeborn.client.read.CelebornInputStream
 import org.apache.celeborn.client.read.MetricsCallback
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.exception.{CelebornIOException, 
PartitionUnRetryAbleException}
-import org.apache.celeborn.common.util.ThreadUtils
+import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils}
 
 class CelebornShuffleReader[K, C](
     handle: CelebornShuffleHandle[K, _, C],
@@ -88,7 +88,7 @@ class CelebornShuffleReader[K, C](
       }
     }
 
-    val streams = new ConcurrentHashMap[Integer, CelebornInputStream]()
+    val streams = JavaUtils.newConcurrentHashMap[Integer, 
CelebornInputStream]()
     (startPartition until endPartition).map(partitionId => {
       streamCreatorPool.submit(new Runnable {
         override def run(): Unit = {
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 7cfa0a324..1b7b6f1dd 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -173,7 +173,7 @@ class CelebornShuffleReader[K, C](
     val end = System.currentTimeMillis()
     logInfo(s"BatchOpenStream for $partCnt cost ${end - startTime}ms")
 
-    val streams = new ConcurrentHashMap[Integer, CelebornInputStream]()
+    val streams = JavaUtils.newConcurrentHashMap[Integer, 
CelebornInputStream]()
 
     def createInputStream(partitionId: Int): Unit = {
       val locations =
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/RequestLocationCallContext.scala
 
b/client/src/main/scala/org/apache/celeborn/client/RequestLocationCallContext.scala
index d5e6c4c36..d306546b3 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/RequestLocationCallContext.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/RequestLocationCallContext.scala
@@ -18,13 +18,13 @@
 package org.apache.celeborn.client
 
 import java.util
-import java.util.concurrent.ConcurrentHashMap
 
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.protocol.PartitionLocation
 import 
org.apache.celeborn.common.protocol.message.ControlMessages.{ChangeLocationResponse,
 RegisterShuffleResponse}
 import org.apache.celeborn.common.protocol.message.StatusCode
 import org.apache.celeborn.common.rpc.RpcCallContext
+import org.apache.celeborn.common.util.JavaUtils
 
 trait RequestLocationCallContext {
   def reply(
@@ -40,7 +40,8 @@ case class ChangeLocationsCallContext(
   extends RequestLocationCallContext with Logging {
   val endedMapIds = new util.HashSet[Integer]()
   val newLocs =
-    new ConcurrentHashMap[Integer, (StatusCode, Boolean, 
PartitionLocation)](partitionCount)
+    JavaUtils.newConcurrentHashMap[Integer, (StatusCode, Boolean, 
PartitionLocation)](
+      partitionCount)
 
   def markMapperEnd(mapId: Int): Unit = this.synchronized {
     endedMapIds.add(mapId)
diff --git 
a/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
 
b/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
index 284ba3148..99a27eb3d 100644
--- 
a/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
+++ 
b/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.LongAdder;
 
 import org.junit.AfterClass;
@@ -76,7 +75,7 @@ public class DataPushQueueSuiteJ {
       tarWorkerData.add(new ArrayList<>());
     }
 
-    Map<Integer, Integer> partitionBatchIdMap = new ConcurrentHashMap<>();
+    Map<Integer, Integer> partitionBatchIdMap = 
JavaUtils.newConcurrentHashMap();
 
     CelebornConf conf = new CelebornConf();
     conf.set(CelebornConf.CLIENT_PUSH_MAX_REQS_IN_FLIGHT_PERWORKER().key(), 
"2");
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java
 
b/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java
index d647ade5c..66ef48376 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java
@@ -19,10 +19,12 @@ package org.apache.celeborn.common.network.sasl;
 
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.celeborn.common.util.JavaUtils;
+
 /** A simple implementation of {@link SecretRegistry} that stores secrets in 
memory. */
 public class SecretRegistryImpl implements SecretRegistry {
 
-  private final ConcurrentHashMap<String, String> secrets = new 
ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, String> secrets = 
JavaUtils.newConcurrentHashMap();
 
   @Override
   public void register(String appId, String secret) {
diff --git 
a/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java 
b/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
index 258a3fd40..9f1d106ce 100644
--- a/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
+++ b/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
@@ -460,6 +460,14 @@ public class JavaUtils {
     }
   }
 
+  public static <K, V> ConcurrentHashMap<K, V> newConcurrentHashMap(int 
initialCapacity) {
+    if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
+      return new ConcurrentHashMap<>(initialCapacity);
+    } else {
+      return new ConcurrentHashMapForJDK8<>(initialCapacity);
+    }
+  }
+
   /**
    * For JDK8, there is bug for ConcurrentHashMap#computeIfAbsent, checking 
the key existence to
    * speed up. See details in CELEBORN-474.
@@ -469,6 +477,10 @@ public class JavaUtils {
       super();
     }
 
+    public ConcurrentHashMapForJDK8(int initialCapacity) {
+      super(initialCapacity);
+    }
+
     public ConcurrentHashMapForJDK8(Map<? extends K, ? extends V> m) {
       super(m);
     }
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/ComputeIfAbsentBenchmark.scala
 
b/common/src/test/scala/org/apache/celeborn/common/ComputeIfAbsentBenchmark.scala
index eb347e64f..f80eea6fd 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/ComputeIfAbsentBenchmark.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/ComputeIfAbsentBenchmark.scala
@@ -18,13 +18,13 @@
 package org.apache.celeborn.common
 
 import java.util.{HashMap => JHashMap, Map => JMap}
-import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.function.{Function => JFunction}
 
 import scala.util.Random
 
 import org.apache.celeborn.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.celeborn.common.util.JavaUtils
 
 /**
  * ComputeIfAbsent benchmark.
@@ -66,6 +66,6 @@ object ComputeIfAbsentBenchmark extends BenchmarkBase {
 
   override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
     test("HashMap", new JHashMap[Int, AtomicInteger], 1 << 26)
-    test("ConcurrentHashMap", new ConcurrentHashMap[Int, AtomicInteger], 1 << 
26)
+    test("ConcurrentHashMap", JavaUtils.newConcurrentHashMap[Int, 
AtomicInteger], 1 << 26)
   }
 }
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
index 6dd6e43e2..52c3f48fa 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
@@ -25,9 +25,10 @@ import 
scala.collection.JavaConverters.{asScalaIteratorConverter, mapAsScalaConc
 
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.WorkerInfo
+import org.apache.celeborn.common.util.JavaUtils
 
 class TagsManager extends Logging {
-  private val tagStore = new ConcurrentHashMap[String, JSet[String]]()
+  private val tagStore = JavaUtils.newConcurrentHashMap[String, JSet[String]]()
 
   private val addNewTagFunc =
     new util.function.Function[String, ConcurrentHashMap.KeySetView[String, 
java.lang.Boolean]]() {

Reply via email to