This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new c3cbdec4a [#519][FOLLOWUP] improvement: Speed up
ConcurrentHashMap#computeIfAbsent (#1719)
c3cbdec4a is described below
commit c3cbdec4a099efbba0a13d2cc53aafd9493ae498
Author: RickyMa <[email protected]>
AuthorDate: Fri May 17 20:22:19 2024 +0800
[#519][FOLLOWUP] improvement: Speed up ConcurrentHashMap#computeIfAbsent
(#1719)
### What changes were proposed in this pull request?
Speed up ConcurrentHashMap#computeIfAbsent by checking key existence.
### Why are the changes needed?
A followup pr for https://github.com/apache/incubator-uniffle/issues/519.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
---
.../org/apache/tez/dag/app/TezRemoteShuffleManager.java | 4 ++--
.../library/common/shuffle/impl/RssShuffleManager.java | 14 +++++++-------
.../shuffle/orderedgrouped/RssShuffleScheduler.java | 6 +++---
.../java/org/apache/uniffle/common/config/RssConf.java | 7 ++++---
.../common/netty/handle/TransportResponseHandler.java | 4 ++--
.../java/org/apache/uniffle/common/util/JavaUtils.java | 17 +++++++++++++++++
.../apache/uniffle/coordinator/QuotaManagerTest.java | 8 ++++----
.../uniffle/server/buffer/ShuffleBufferManager.java | 3 +--
8 files changed, 40 insertions(+), 23 deletions(-)
diff --git
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
index 59bfc8f19..b37d9486f 100644
---
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
+++
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
@@ -25,7 +25,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
@@ -64,6 +63,7 @@ import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RetryUtils;
import static
org.apache.uniffle.common.config.RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE;
@@ -154,7 +154,7 @@ public class TezRemoteShuffleManager implements
ServicePluginLifecycle {
private class TezRemoteShuffleUmbilicalProtocolImpl implements
TezRemoteShuffleUmbilicalProtocol {
private Map<Integer, ShuffleAssignmentsInfo> shuffleIdToShuffleAssignsInfo
=
- new ConcurrentHashMap<>();
+ JavaUtils.newConcurrentMap();
@Override
public long getProtocolVersion(String s, long l) throws IOException {
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java
index bcd089b4f..311d09f68 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java
@@ -33,7 +33,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -100,6 +99,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.JavaUtils;
// This only knows how to deal with a single srcIndex for a given targetIndex.
// In case the src task generates multiple outputs for the same target Index
@@ -211,9 +211,9 @@ public class RssShuffleManager extends ShuffleManager {
private final BlockingQueue<Integer> pendingPartition = new
LinkedBlockingQueue<>();
Map<Integer, List<InputAttemptIdentifier>> partitionToInput = new
HashMap<>();
private final Map<Integer, Roaring64NavigableMap> rssAllBlockIdBitmapMap =
- new ConcurrentHashMap<>();
+ JavaUtils.newConcurrentMap();
private final Map<Integer, Roaring64NavigableMap> rssSuccessBlockIdBitmapMap
=
- new ConcurrentHashMap<>();
+ JavaUtils.newConcurrentMap();
private final AtomicInteger numNoDataInput = new AtomicInteger(0);
private final AtomicInteger numWithDataInput = new AtomicInteger(0);
@@ -292,10 +292,10 @@ public class RssShuffleManager extends ShuffleManager {
* not know upfront the number of spills from source.
*/
completedInputs = new LinkedBlockingDeque<>();
- knownSrcHosts = new ConcurrentHashMap<>();
+ knownSrcHosts = JavaUtils.newConcurrentMap();
pendingHosts = new LinkedBlockingQueue<>();
- obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<>());
- rssRunningFetchers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ obsoletedInputs = Collections.newSetFromMap(JavaUtils.newConcurrentMap());
+ rssRunningFetchers =
Collections.newSetFromMap(JavaUtils.newConcurrentMap());
int maxConfiguredFetchers =
conf.getInt(
@@ -372,7 +372,7 @@ public class RssShuffleManager extends ShuffleManager {
Arrays.sort(this.localDisks);
}
- shuffleInfoEventsMap = new ConcurrentHashMap<>();
+ shuffleInfoEventsMap = JavaUtils.newConcurrentMap();
LOG.info(
srcNameTrimmed
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
index ea2fe457c..07e0f9021 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
@@ -32,7 +32,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
@@ -98,6 +97,7 @@ import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.UnitConverter;
class RssShuffleScheduler extends ShuffleScheduler {
@@ -180,7 +180,7 @@ class RssShuffleScheduler extends ShuffleScheduler {
// TODO Clean this and other maps at some point
@VisibleForTesting
final ConcurrentMap<PathPartition, InputAttemptIdentifier>
pathToIdentifierMap =
- new ConcurrentHashMap<>();
+ JavaUtils.newConcurrentMap();
// To track shuffleInfo events when finalMerge is disabled in source or
pipelined shuffle is
// enabled in source.
@@ -216,7 +216,7 @@ class RssShuffleScheduler extends ShuffleScheduler {
private final int numFetchers;
private final Set<RssTezShuffleDataFetcher> rssRunningFetchers =
- Collections.newSetFromMap(new ConcurrentHashMap<>());
+ Collections.newSetFromMap(JavaUtils.newConcurrentMap());
private final ListeningExecutorService fetcherExecutor;
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
index 78c56f96e..ac5424e01 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.UnitConverter;
public class RssConf implements Cloneable {
@@ -38,7 +39,7 @@ public class RssConf implements Cloneable {
/** Creates a new empty configuration. */
public RssConf() {
- this.settings = new ConcurrentHashMap<>();
+ this.settings = JavaUtils.newConcurrentMap();
}
/**
@@ -47,7 +48,7 @@ public class RssConf implements Cloneable {
* @param other The configuration to copy the entries from.
*/
public RssConf(RssConf other) {
- this.settings = new ConcurrentHashMap<>(other.settings);
+ this.settings = JavaUtils.newConcurrentMap(other.settings);
}
public Set<String> getKeySet() {
@@ -509,7 +510,7 @@ public class RssConf implements Cloneable {
@Override
public RssConf clone() throws CloneNotSupportedException {
RssConf config = (RssConf) super.clone();
- config.settings = new ConcurrentHashMap<>(settings);
+ config.settings = JavaUtils.newConcurrentMap(settings);
return config;
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/handle/TransportResponseHandler.java
b/common/src/main/java/org/apache/uniffle/common/netty/handle/TransportResponseHandler.java
index f7c9ccc53..2e289ccdd 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/handle/TransportResponseHandler.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/handle/TransportResponseHandler.java
@@ -19,7 +19,6 @@ package org.apache.uniffle.common.netty.handle;
import java.io.IOException;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import io.netty.channel.Channel;
@@ -28,6 +27,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.netty.client.RpcResponseCallback;
import org.apache.uniffle.common.netty.protocol.RpcResponse;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.NettyUtils;
public class TransportResponseHandler extends MessageHandler<RpcResponse> {
@@ -41,7 +41,7 @@ public class TransportResponseHandler extends
MessageHandler<RpcResponse> {
public TransportResponseHandler(Channel channel) {
this.channel = channel;
- this.outstandingRpcRequests = new ConcurrentHashMap<>();
+ this.outstandingRpcRequests = JavaUtils.newConcurrentMap();
this.timeOfLastRequestNs = new AtomicLong(0);
}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/JavaUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/JavaUtils.java
index bc556da3c..f8243eb25 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/JavaUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/JavaUtils.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.common.util;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
@@ -60,11 +61,27 @@ public class JavaUtils {
}
}
+ public static <K, V> ConcurrentHashMap<K, V> newConcurrentMap(Map<? extends
K, ? extends V> m) {
+ if (isJavaVersionAtLeastJava9()) {
+ return new ConcurrentHashMap<>(m);
+ } else {
+ return new ConcurrentHashMapForJDK8<>(m);
+ }
+ }
+
/**
* For JDK8, there is bug for ConcurrentHashMap#computeIfAbsent, checking
the key existence to
* speed up. See details in issue #519
*/
private static class ConcurrentHashMapForJDK8<K, V> extends
ConcurrentHashMap<K, V> {
+ ConcurrentHashMapForJDK8() {
+ super();
+ }
+
+ ConcurrentHashMapForJDK8(Map<? extends K, ? extends V> m) {
+ super(m);
+ }
+
@Override
public V computeIfAbsent(K key, Function<? super K, ? extends V>
mappingFunction) {
V result;
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
index ff378d63d..b7ce09177 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
@@ -19,7 +19,6 @@ package org.apache.uniffle.coordinator;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -30,6 +29,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
import static org.awaitility.Awaitility.await;
@@ -95,7 +95,7 @@ public class QuotaManagerTest {
conf.setInteger(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM, 5);
final ApplicationManager applicationManager = new ApplicationManager(conf);
final AtomicInteger uuid = new AtomicInteger();
- Map<String, Long> uuidAndTime = new ConcurrentHashMap<>();
+ Map<String, Long> uuidAndTime = JavaUtils.newConcurrentMap();
uuidAndTime.put(String.valueOf(uuid.incrementAndGet()),
System.currentTimeMillis());
uuidAndTime.put(String.valueOf(uuid.incrementAndGet()),
System.currentTimeMillis());
uuidAndTime.put(String.valueOf(uuid.incrementAndGet()),
System.currentTimeMillis());
@@ -137,7 +137,7 @@ public class QuotaManagerTest {
final int i2 = uuid.incrementAndGet();
final int i3 = uuid.incrementAndGet();
final int i4 = uuid.incrementAndGet();
- Map<String, Long> uuidAndTime = new ConcurrentHashMap<>();
+ Map<String, Long> uuidAndTime = JavaUtils.newConcurrentMap();
uuidAndTime.put(String.valueOf(i1), System.currentTimeMillis());
uuidAndTime.put(String.valueOf(i2), System.currentTimeMillis());
uuidAndTime.put(String.valueOf(i3), System.currentTimeMillis());
@@ -207,7 +207,7 @@ public class QuotaManagerTest {
}
private Map<String, Long> mockUUidAppAndTime(int mockAppNum) {
- Map<String, Long> uuidAndTime = new ConcurrentHashMap<>();
+ Map<String, Long> uuidAndTime = JavaUtils.newConcurrentMap();
for (int i = 0; i < mockAppNum; i++) {
uuidAndTime.put(mockUUidAppId(), System.currentTimeMillis());
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 8c1e39cf2..fd77a04cf 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -111,7 +110,7 @@ public class ShuffleBufferManager {
capacity,
readCapacity);
this.shuffleFlushManager = shuffleFlushManager;
- this.bufferPool = new ConcurrentHashMap<>();
+ this.bufferPool = JavaUtils.newConcurrentMap();
this.highWaterMark =
(long)
(capacity