This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new c3cbdec4a [#519][FOLLOWUP] improvement: Speed up 
ConcurrentHashMap#computeIfAbsent (#1719)
c3cbdec4a is described below

commit c3cbdec4a099efbba0a13d2cc53aafd9493ae498
Author: RickyMa <[email protected]>
AuthorDate: Fri May 17 20:22:19 2024 +0800

    [#519][FOLLOWUP] improvement: Speed up ConcurrentHashMap#computeIfAbsent 
(#1719)
    
    ### What changes were proposed in this pull request?
    
    Speed up ConcurrentHashMap#computeIfAbsent by checking key existence.
    
    ### Why are the changes needed?
    
    A followup pr for https://github.com/apache/incubator-uniffle/issues/519.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs.
---
 .../org/apache/tez/dag/app/TezRemoteShuffleManager.java |  4 ++--
 .../library/common/shuffle/impl/RssShuffleManager.java  | 14 +++++++-------
 .../shuffle/orderedgrouped/RssShuffleScheduler.java     |  6 +++---
 .../java/org/apache/uniffle/common/config/RssConf.java  |  7 ++++---
 .../common/netty/handle/TransportResponseHandler.java   |  4 ++--
 .../java/org/apache/uniffle/common/util/JavaUtils.java  | 17 +++++++++++++++++
 .../apache/uniffle/coordinator/QuotaManagerTest.java    |  8 ++++----
 .../uniffle/server/buffer/ShuffleBufferManager.java     |  3 +--
 8 files changed, 40 insertions(+), 23 deletions(-)

diff --git 
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java 
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
index 59bfc8f19..b37d9486f 100644
--- 
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
+++ 
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
@@ -25,7 +25,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Sets;
@@ -64,6 +63,7 @@ import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RetryUtils;
 
 import static 
org.apache.uniffle.common.config.RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE;
@@ -154,7 +154,7 @@ public class TezRemoteShuffleManager implements 
ServicePluginLifecycle {
 
   private class TezRemoteShuffleUmbilicalProtocolImpl implements 
TezRemoteShuffleUmbilicalProtocol {
     private Map<Integer, ShuffleAssignmentsInfo> shuffleIdToShuffleAssignsInfo 
=
-        new ConcurrentHashMap<>();
+        JavaUtils.newConcurrentMap();
 
     @Override
     public long getProtocolVersion(String s, long l) throws IOException {
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java
index bcd089b4f..311d09f68 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java
@@ -33,7 +33,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -100,6 +99,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.JavaUtils;
 
 // This only knows how to deal with a single srcIndex for a given targetIndex.
 // In case the src task generates multiple outputs for the same target Index
@@ -211,9 +211,9 @@ public class RssShuffleManager extends ShuffleManager {
   private final BlockingQueue<Integer> pendingPartition = new 
LinkedBlockingQueue<>();
   Map<Integer, List<InputAttemptIdentifier>> partitionToInput = new 
HashMap<>();
   private final Map<Integer, Roaring64NavigableMap> rssAllBlockIdBitmapMap =
-      new ConcurrentHashMap<>();
+      JavaUtils.newConcurrentMap();
   private final Map<Integer, Roaring64NavigableMap> rssSuccessBlockIdBitmapMap 
=
-      new ConcurrentHashMap<>();
+      JavaUtils.newConcurrentMap();
   private final AtomicInteger numNoDataInput = new AtomicInteger(0);
   private final AtomicInteger numWithDataInput = new AtomicInteger(0);
 
@@ -292,10 +292,10 @@ public class RssShuffleManager extends ShuffleManager {
      * not know upfront the number of spills from source.
      */
     completedInputs = new LinkedBlockingDeque<>();
-    knownSrcHosts = new ConcurrentHashMap<>();
+    knownSrcHosts = JavaUtils.newConcurrentMap();
     pendingHosts = new LinkedBlockingQueue<>();
-    obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<>());
-    rssRunningFetchers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    obsoletedInputs = Collections.newSetFromMap(JavaUtils.newConcurrentMap());
+    rssRunningFetchers = 
Collections.newSetFromMap(JavaUtils.newConcurrentMap());
 
     int maxConfiguredFetchers =
         conf.getInt(
@@ -372,7 +372,7 @@ public class RssShuffleManager extends ShuffleManager {
       Arrays.sort(this.localDisks);
     }
 
-    shuffleInfoEventsMap = new ConcurrentHashMap<>();
+    shuffleInfoEventsMap = JavaUtils.newConcurrentMap();
 
     LOG.info(
         srcNameTrimmed
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
index ea2fe457c..07e0f9021 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
@@ -32,7 +32,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
@@ -98,6 +97,7 @@ import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.UnitConverter;
 
 class RssShuffleScheduler extends ShuffleScheduler {
@@ -180,7 +180,7 @@ class RssShuffleScheduler extends ShuffleScheduler {
   // TODO Clean this and other maps at some point
   @VisibleForTesting
   final ConcurrentMap<PathPartition, InputAttemptIdentifier> 
pathToIdentifierMap =
-      new ConcurrentHashMap<>();
+      JavaUtils.newConcurrentMap();
 
   // To track shuffleInfo events when finalMerge is disabled in source or 
pipelined shuffle is
   // enabled in source.
@@ -216,7 +216,7 @@ class RssShuffleScheduler extends ShuffleScheduler {
 
   private final int numFetchers;
   private final Set<RssTezShuffleDataFetcher> rssRunningFetchers =
-      Collections.newSetFromMap(new ConcurrentHashMap<>());
+      Collections.newSetFromMap(JavaUtils.newConcurrentMap());
 
   private final ListeningExecutorService fetcherExecutor;
 
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
index 78c56f96e..ac5424e01 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 
+import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.UnitConverter;
 
 public class RssConf implements Cloneable {
@@ -38,7 +39,7 @@ public class RssConf implements Cloneable {
 
   /** Creates a new empty configuration. */
   public RssConf() {
-    this.settings = new ConcurrentHashMap<>();
+    this.settings = JavaUtils.newConcurrentMap();
   }
 
   /**
@@ -47,7 +48,7 @@ public class RssConf implements Cloneable {
    * @param other The configuration to copy the entries from.
    */
   public RssConf(RssConf other) {
-    this.settings = new ConcurrentHashMap<>(other.settings);
+    this.settings = JavaUtils.newConcurrentMap(other.settings);
   }
 
   public Set<String> getKeySet() {
@@ -509,7 +510,7 @@ public class RssConf implements Cloneable {
   @Override
   public RssConf clone() throws CloneNotSupportedException {
     RssConf config = (RssConf) super.clone();
-    config.settings = new ConcurrentHashMap<>(settings);
+    config.settings = JavaUtils.newConcurrentMap(settings);
     return config;
   }
 
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/handle/TransportResponseHandler.java
 
b/common/src/main/java/org/apache/uniffle/common/netty/handle/TransportResponseHandler.java
index f7c9ccc53..2e289ccdd 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/handle/TransportResponseHandler.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/handle/TransportResponseHandler.java
@@ -19,7 +19,6 @@ package org.apache.uniffle.common.netty.handle;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import io.netty.channel.Channel;
@@ -28,6 +27,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.netty.client.RpcResponseCallback;
 import org.apache.uniffle.common.netty.protocol.RpcResponse;
+import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.NettyUtils;
 
 public class TransportResponseHandler extends MessageHandler<RpcResponse> {
@@ -41,7 +41,7 @@ public class TransportResponseHandler extends 
MessageHandler<RpcResponse> {
 
   public TransportResponseHandler(Channel channel) {
     this.channel = channel;
-    this.outstandingRpcRequests = new ConcurrentHashMap<>();
+    this.outstandingRpcRequests = JavaUtils.newConcurrentMap();
     this.timeOfLastRequestNs = new AtomicLong(0);
   }
 
diff --git a/common/src/main/java/org/apache/uniffle/common/util/JavaUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/JavaUtils.java
index bc556da3c..f8243eb25 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/JavaUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/JavaUtils.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.common.util;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 
@@ -60,11 +61,27 @@ public class JavaUtils {
     }
   }
 
+  public static <K, V> ConcurrentHashMap<K, V> newConcurrentMap(Map<? extends 
K, ? extends V> m) {
+    if (isJavaVersionAtLeastJava9()) {
+      return new ConcurrentHashMap<>(m);
+    } else {
+      return new ConcurrentHashMapForJDK8<>(m);
+    }
+  }
+
   /**
    * For JDK8, there is bug for ConcurrentHashMap#computeIfAbsent, checking 
the key existence to
    * speed up. See details in issue #519
    */
   private static class ConcurrentHashMapForJDK8<K, V> extends 
ConcurrentHashMap<K, V> {
+    ConcurrentHashMapForJDK8() {
+      super();
+    }
+
+    ConcurrentHashMapForJDK8(Map<? extends K, ? extends V> m) {
+      super(m);
+    }
+
     @Override
     public V computeIfAbsent(K key, Function<? super K, ? extends V> 
mappingFunction) {
       V result;
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
index ff378d63d..b7ce09177 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
@@ -19,7 +19,6 @@ package org.apache.uniffle.coordinator;
 
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -30,6 +29,7 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
 
 import static org.awaitility.Awaitility.await;
@@ -95,7 +95,7 @@ public class QuotaManagerTest {
     conf.setInteger(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM, 5);
     final ApplicationManager applicationManager = new ApplicationManager(conf);
     final AtomicInteger uuid = new AtomicInteger();
-    Map<String, Long> uuidAndTime = new ConcurrentHashMap<>();
+    Map<String, Long> uuidAndTime = JavaUtils.newConcurrentMap();
     uuidAndTime.put(String.valueOf(uuid.incrementAndGet()), 
System.currentTimeMillis());
     uuidAndTime.put(String.valueOf(uuid.incrementAndGet()), 
System.currentTimeMillis());
     uuidAndTime.put(String.valueOf(uuid.incrementAndGet()), 
System.currentTimeMillis());
@@ -137,7 +137,7 @@ public class QuotaManagerTest {
     final int i2 = uuid.incrementAndGet();
     final int i3 = uuid.incrementAndGet();
     final int i4 = uuid.incrementAndGet();
-    Map<String, Long> uuidAndTime = new ConcurrentHashMap<>();
+    Map<String, Long> uuidAndTime = JavaUtils.newConcurrentMap();
     uuidAndTime.put(String.valueOf(i1), System.currentTimeMillis());
     uuidAndTime.put(String.valueOf(i2), System.currentTimeMillis());
     uuidAndTime.put(String.valueOf(i3), System.currentTimeMillis());
@@ -207,7 +207,7 @@ public class QuotaManagerTest {
   }
 
   private Map<String, Long> mockUUidAppAndTime(int mockAppNum) {
-    Map<String, Long> uuidAndTime = new ConcurrentHashMap<>();
+    Map<String, Long> uuidAndTime = JavaUtils.newConcurrentMap();
     for (int i = 0; i < mockAppNum; i++) {
       uuidAndTime.put(mockUUidAppId(), System.currentTimeMillis());
     }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 8c1e39cf2..fd77a04cf 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -111,7 +110,7 @@ public class ShuffleBufferManager {
         capacity,
         readCapacity);
     this.shuffleFlushManager = shuffleFlushManager;
-    this.bufferPool = new ConcurrentHashMap<>();
+    this.bufferPool = JavaUtils.newConcurrentMap();
     this.highWaterMark =
         (long)
             (capacity

Reply via email to