This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ece8d07d [#519] Speed up ConcurrentHashMap#computeIfAbsent (#766)
ece8d07d is described below
commit ece8d07db9939a0043f80ae73df52228ebfde156
Author: Fei Wang <[email protected]>
AuthorDate: Mon Mar 27 21:30:11 2023 +0800
[#519] Speed up ConcurrentHashMap#computeIfAbsent (#766)
### What changes were proposed in this pull request?
Speed up ConcurrentHashMap#computeIfAbsent by checking key existence.
### Why are the changes needed?
Fix: #519.
According to the bug mentioned in
https://bugs.openjdk.org/browse/JDK-8161372,
we could check the key existence before invoking computeIfAbsent,
especially for
the critical path like ShuffleTaskManager#refreshAppId.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UT.
---
.../hadoop/mapred/SortWriteBufferManager.java | 5 ++--
.../hadoop/mapred/SortWriteBufferManagerTest.java | 8 +++----
.../apache/hadoop/mapred/SortWriteBufferTest.java | 7 +++---
.../hadoop/mapreduce/task/reduce/FetcherTest.java | 4 ++--
.../apache/spark/shuffle/RssShuffleManager.java | 8 +++----
.../apache/spark/shuffle/RssShuffleManager.java | 8 +++----
.../spark/shuffle/writer/RssShuffleWriterTest.java | 13 ++++++-----
.../client/impl/ShuffleWriteClientImpl.java | 5 ++--
.../apache/uniffle/common/metrics/GRPCMetrics.java | 10 ++++----
.../org/apache/uniffle/common/util/JavaUtils.java | 27 ++++++++++++++++++++++
.../uniffle/coordinator/ApplicationManager.java | 15 ++++++------
.../uniffle/coordinator/ClientConfManager.java | 6 ++---
.../apache/uniffle/coordinator/QuotaManager.java | 10 ++++----
.../uniffle/coordinator/SimpleClusterManager.java | 6 ++---
.../coordinator/metric/CoordinatorMetrics.java | 4 ++--
.../PartitionBalanceAssignmentStrategy.java | 6 ++---
.../coordinator/SimpleClusterManagerTest.java | 12 +++++-----
.../test/GetShuffleReportForMultiPartTest.java | 3 ++-
.../client/factory/ShuffleServerClientFactory.java | 9 ++++----
.../apache/uniffle/server/ShuffleFlushManager.java | 6 ++---
.../uniffle/server/ShuffleServerMetrics.java | 10 ++++----
.../org/apache/uniffle/server/ShuffleTaskInfo.java | 14 +++++------
.../apache/uniffle/server/ShuffleTaskManager.java | 9 ++++----
.../uniffle/server/buffer/ShuffleBuffer.java | 4 ++--
.../server/buffer/ShuffleBufferManager.java | 7 +++---
.../uniffle/server/storage/HdfsStorageManager.java | 5 ++--
.../server/storage/LocalStorageManager.java | 3 ++-
.../server/MockedShuffleServerGrpcService.java | 6 ++---
.../uniffle/storage/common/AbstractStorage.java | 14 +++++------
.../uniffle/storage/common/LocalStorageMeta.java | 5 ++--
30 files changed, 143 insertions(+), 106 deletions(-)
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index f36b429d..f402d423 100644
---
a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++
b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -49,6 +49,7 @@ import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.ChecksumUtils;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.ThreadUtils;
public class SortWriteBufferManager<K, V> {
@@ -56,7 +57,7 @@ public class SortWriteBufferManager<K, V> {
private static final Logger LOG =
LoggerFactory.getLogger(SortWriteBufferManager.class);
private final long maxMemSize;
- private final Map<Integer, SortWriteBuffer<K, V>> buffers =
Maps.newConcurrentMap();
+ private final Map<Integer, SortWriteBuffer<K, V>> buffers =
JavaUtils.newConcurrentMap();
private final Map<Integer, Integer> partitionToSeqNo = Maps.newHashMap();
private final Counters.Counter mapOutputByteCounter;
private final Counters.Counter mapOutputRecordCounter;
@@ -83,7 +84,7 @@ public class SortWriteBufferManager<K, V> {
private final long sendCheckInterval;
private final Set<Long> allBlockIds = Sets.newConcurrentHashSet();
private final int bitmapSplitNum;
- private final Map<Integer, List<Long>> partitionToBlocks =
Maps.newConcurrentMap();
+ private final Map<Integer, List<Long>> partitionToBlocks =
JavaUtils.newConcurrentMap();
private final long maxSegmentSize;
private final boolean isMemoryShuffleEnabled;
private final int numMaps;
diff --git
a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
index d8c9131f..a94f8baa 100644
---
a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
+++
b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
@@ -23,7 +23,6 @@ import java.util.Random;
import java.util.Set;
import java.util.function.Supplier;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
@@ -42,6 +41,7 @@ import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.JavaUtils;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -54,7 +54,7 @@ public class SortWriteBufferManagerTest {
JobConf jobConf = new JobConf(new Configuration());
SerializationFactory serializationFactory = new
SerializationFactory(jobConf);
MockShuffleWriteClient client = new MockShuffleWriteClient();
- Map<Integer, List<ShuffleServerInfo>> partitionToServers =
Maps.newConcurrentMap();
+ Map<Integer, List<ShuffleServerInfo>> partitionToServers =
JavaUtils.newConcurrentMap();
Set<Long> successBlocks = Sets.newConcurrentHashSet();
Set<Long> failedBlocks = Sets.newConcurrentHashSet();
Counters.Counter mapOutputByteCounter = new Counters.Counter();
@@ -154,7 +154,7 @@ public class SortWriteBufferManagerTest {
SerializationFactory serializationFactory = new
SerializationFactory(jobConf);
MockShuffleWriteClient client = new MockShuffleWriteClient();
client.setMode(2);
- Map<Integer, List<ShuffleServerInfo>> partitionToServers =
Maps.newConcurrentMap();
+ Map<Integer, List<ShuffleServerInfo>> partitionToServers =
JavaUtils.newConcurrentMap();
Set<Long> successBlocks = Sets.newConcurrentHashSet();
Set<Long> failedBlocks = Sets.newConcurrentHashSet();
Counters.Counter mapOutputByteCounter = new Counters.Counter();
@@ -203,7 +203,7 @@ public class SortWriteBufferManagerTest {
SerializationFactory serializationFactory = new
SerializationFactory(jobConf);
MockShuffleWriteClient client = new MockShuffleWriteClient();
client.setMode(2);
- Map<Integer, List<ShuffleServerInfo>> partitionToServers =
Maps.newConcurrentMap();
+ Map<Integer, List<ShuffleServerInfo>> partitionToServers =
JavaUtils.newConcurrentMap();
Set<Long> successBlocks = Sets.newConcurrentHashSet();
Set<Long> failedBlocks = Sets.newConcurrentHashSet();
Counters.Counter mapOutputByteCounter = new Counters.Counter();
diff --git
a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferTest.java
b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferTest.java
index b36e3014..46474f27 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferTest.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.Map;
import java.util.Random;
-import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparator;
@@ -33,6 +32,8 @@ import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.junit.jupiter.api.Test;
+import org.apache.uniffle.common.util.JavaUtils;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
public class SortWriteBufferTest {
@@ -88,14 +89,14 @@ public class SortWriteBufferTest {
key = new BytesWritable(keyStr.getBytes());
keySerializer.serialize(key);
byte[] valueBytes = new byte[200];
- Map<String, BytesWritable> valueMap = Maps.newConcurrentMap();
+ Map<String, BytesWritable> valueMap = JavaUtils.newConcurrentMap();
Random random = new Random();
random.nextBytes(valueBytes);
value = new BytesWritable(valueBytes);
valueMap.putIfAbsent(keyStr, value);
valSerializer.serialize(value);
recordLength = buffer.addRecord(key, value);
- Map<String, Long> recordLenMap = Maps.newConcurrentMap();
+ Map<String, Long> recordLenMap = JavaUtils.newConcurrentMap();
recordLenMap.putIfAbsent(keyStr, recordLength);
keyStr = "key1";
diff --git
a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
index 2ec6b709..81edacc5 100644
---
a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
+++
b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
@@ -28,7 +28,6 @@ import java.util.TreeMap;
import java.util.function.Supplier;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -74,6 +73,7 @@ import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.compression.Lz4Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.JavaUtils;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -247,7 +247,7 @@ public class FetcherTest {
SerializationFactory serializationFactory = new
SerializationFactory(jobConf);
MockShuffleWriteClient client = new MockShuffleWriteClient();
client.setMode(2);
- Map<Integer, List<ShuffleServerInfo>> partitionToServers =
Maps.newConcurrentMap();
+ Map<Integer, List<ShuffleServerInfo>> partitionToServers =
JavaUtils.newConcurrentMap();
Set<Long> successBlocks = Sets.newConcurrentHashSet();
Set<Long> failedBlocks = Sets.newConcurrentHashSet();
Counters.Counter mapOutputByteCounter = new Counters.Counter();
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index e7efbf86..e9bff490 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
@@ -65,6 +64,7 @@ import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
@@ -79,9 +79,9 @@ public class RssShuffleManager implements ShuffleManager {
private String appId = "";
private String clientType;
private ShuffleWriteClient shuffleWriteClient;
- private Map<String, Set<Long>> taskToSuccessBlockIds =
Maps.newConcurrentMap();
- private Map<String, Set<Long>> taskToFailedBlockIds =
Maps.newConcurrentMap();
- private Map<String, WriteBufferManager> taskToBufferManager =
Maps.newConcurrentMap();
+ private Map<String, Set<Long>> taskToSuccessBlockIds =
JavaUtils.newConcurrentMap();
+ private Map<String, Set<Long>> taskToFailedBlockIds =
JavaUtils.newConcurrentMap();
+ private Map<String, WriteBufferManager> taskToBufferManager =
JavaUtils.newConcurrentMap();
private final int dataReplica;
private final int dataReplicaWrite;
private final int dataReplicaRead;
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 504001c5..452a4b19 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -32,7 +32,6 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -75,6 +74,7 @@ import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
@@ -97,7 +97,7 @@ public class RssShuffleManager implements ShuffleManager {
private ShuffleWriteClient shuffleWriteClient;
private final Map<String, Set<Long>> taskToSuccessBlockIds;
private final Map<String, Set<Long>> taskToFailedBlockIds;
- private Map<String, WriteBufferManager> taskToBufferManager =
Maps.newConcurrentMap();
+ private Map<String, WriteBufferManager> taskToBufferManager =
JavaUtils.newConcurrentMap();
private ScheduledExecutorService heartBeatScheduledExecutorService;
private boolean heartbeatStarted = false;
private boolean dynamicConfEnabled = false;
@@ -210,8 +210,8 @@ public class RssShuffleManager implements ShuffleManager {
// shuffle cluster, we don't need shuffle data locality
sparkConf.set("spark.shuffle.reduceLocality.enabled", "false");
LOG.info("Disable shuffle data locality in RssShuffleManager.");
- taskToSuccessBlockIds = Maps.newConcurrentMap();
- taskToFailedBlockIds = Maps.newConcurrentMap();
+ taskToSuccessBlockIds = JavaUtils.newConcurrentMap();
+ taskToFailedBlockIds = JavaUtils.newConcurrentMap();
// for non-driver executor, start a thread for sending shuffle data to
shuffle server
LOG.info("RSS data send thread is starting");
eventLoop = defaultEventLoop;
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 6fe8a45f..f446cf3a 100644
---
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -50,6 +50,7 @@ import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -78,8 +79,8 @@ public class RssShuffleWriterTest {
.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(),
"127.0.0.1:12345,127.0.0.1:12346");
// init SparkContext
final SparkContext sc = SparkContext.getOrCreate(conf);
- Map<String, Set<Long>> failBlocks = Maps.newConcurrentMap();
- Map<String, Set<Long>> successBlocks = Maps.newConcurrentMap();
+ Map<String, Set<Long>> failBlocks = JavaUtils.newConcurrentMap();
+ Map<String, Set<Long>> successBlocks = JavaUtils.newConcurrentMap();
Serializer kryoSerializer = new KryoSerializer(conf);
RssShuffleManager manager = TestUtils.createShuffleManager(
conf,
@@ -150,7 +151,7 @@ public class RssShuffleWriterTest {
// init SparkContext
List<ShuffleBlockInfo> shuffleBlockInfos = Lists.newArrayList();
final SparkContext sc = SparkContext.getOrCreate(conf);
- Map<String, Set<Long>> successBlockIds = Maps.newConcurrentMap();
+ Map<String, Set<Long>> successBlockIds = JavaUtils.newConcurrentMap();
EventLoop<AddBlockEvent> testLoop = new EventLoop<AddBlockEvent>("test") {
@Override
public void onReceive(AddBlockEvent event) {
@@ -172,7 +173,7 @@ public class RssShuffleWriterTest {
false,
testLoop,
successBlockIds,
- Maps.newConcurrentMap());
+ JavaUtils.newConcurrentMap());
Serializer kryoSerializer = new KryoSerializer(conf);
Partitioner mockPartitioner = mock(Partitioner.class);
final ShuffleWriteClient mockShuffleWriteClient =
mock(ShuffleWriteClient.class);
@@ -287,8 +288,8 @@ public class RssShuffleWriterTest {
sparkConf,
false,
eventLoop,
- Maps.newConcurrentMap(),
- Maps.newConcurrentMap()));
+ JavaUtils.newConcurrentMap(),
+ JavaUtils.newConcurrentMap()));
RssShuffleHandle<String, String, String> mockHandle =
mock(RssShuffleHandle.class);
when(mockHandle.getDependency()).thenReturn(mockDependency);
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 05b3be68..07771088 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -87,6 +87,7 @@ import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.ThreadUtils;
public class ShuffleWriteClientImpl implements ShuffleWriteClient {
@@ -98,7 +99,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
private long retryIntervalMax;
private List<CoordinatorClient> coordinatorClients = Lists.newLinkedList();
//appId -> shuffleId -> servers
- private Map<String, Map<Integer, Set<ShuffleServerInfo>>>
shuffleServerInfoMap = Maps.newConcurrentMap();
+ private Map<String, Map<Integer, Set<ShuffleServerInfo>>>
shuffleServerInfoMap = JavaUtils.newConcurrentMap();
private CoordinatorClientFactory coordinatorClientFactory;
private ExecutorService heartBeatExecutorService;
private int replica;
@@ -803,7 +804,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
void addShuffleServer(String appId, int shuffleId, ShuffleServerInfo
serverInfo) {
Map<Integer, Set<ShuffleServerInfo>> appServerMap =
shuffleServerInfoMap.get(appId);
if (appServerMap == null) {
- appServerMap = Maps.newConcurrentMap();
+ appServerMap = JavaUtils.newConcurrentMap();
shuffleServerInfoMap.put(appId, appServerMap);
}
Set<ShuffleServerInfo> shuffleServerInfos = appServerMap.get(shuffleId);
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
index 7909ff80..e6103d21 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
@@ -19,13 +19,13 @@ package org.apache.uniffle.common.metrics;
import java.util.Map;
-import com.google.common.collect.Maps;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.JavaUtils;
public abstract class GRPCMetrics {
// Grpc server internal executor metrics
@@ -39,10 +39,10 @@ public abstract class GRPCMetrics {
private static final String GRPC_TOTAL = "grpc_total";
private boolean isRegistered = false;
- protected Map<String, Counter> counterMap = Maps.newConcurrentMap();
- protected Map<String, Gauge> gaugeMap = Maps.newConcurrentMap();
- protected Map<String, Summary> transportTimeSummaryMap =
Maps.newConcurrentMap();
- protected Map<String, Summary> processTimeSummaryMap =
Maps.newConcurrentMap();
+ protected Map<String, Counter> counterMap = JavaUtils.newConcurrentMap();
+ protected Map<String, Gauge> gaugeMap = JavaUtils.newConcurrentMap();
+ protected Map<String, Summary> transportTimeSummaryMap =
JavaUtils.newConcurrentMap();
+ protected Map<String, Summary> processTimeSummaryMap =
JavaUtils.newConcurrentMap();
private Gauge gaugeGrpcOpen;
private Counter counterGrpcTotal;
protected MetricsManager metricsManager;
diff --git a/common/src/main/java/org/apache/uniffle/common/util/JavaUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/JavaUtils.java
index 318a0ca9..ebbe34c5 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/JavaUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/JavaUtils.java
@@ -19,7 +19,11 @@ package org.apache.uniffle.common.util;
import java.io.Closeable;
import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.commons.lang3.JavaVersion;
+import org.apache.commons.lang3.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,4 +40,27 @@ public class JavaUtils {
logger.error("IOException should not have been thrown.", e);
}
}
+
+ public static <K, V> ConcurrentHashMap<K, V> newConcurrentMap() {
+ if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
+ return new ConcurrentHashMap();
+ } else {
+ return new ConcurrentHashMapForJDK8();
+ }
+ }
+
+ /**
+ * For JDK8, there is bug for ConcurrentHashMap#computeIfAbsent, checking
the key existence to
+ * speed up. See details in issue #519
+ */
+ private static class ConcurrentHashMapForJDK8<K, V> extends
ConcurrentHashMap<K, V> {
+ @Override
+ public V computeIfAbsent(K key, Function<? super K, ? extends V>
mappingFunction) {
+ V result;
+ if (null == (result = get(key))) {
+ result = super.computeIfAbsent(key, mappingFunction);
+ }
+ return result;
+ }
+ }
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index b5e3f740..2c842428 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
@@ -60,21 +61,21 @@ public class ApplicationManager implements Closeable {
private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
// store remote path -> application count for assignment strategy
private final Map<String, RankValue> remoteStoragePathRankValue;
- private final Map<String, String> remoteStorageToHost =
Maps.newConcurrentMap();
+ private final Map<String, String> remoteStorageToHost =
JavaUtils.newConcurrentMap();
private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
private final ScheduledExecutorService detectStorageScheduler;
private final ScheduledExecutorService checkAppScheduler;
- private Map<String, Map<String, Long>> currentUserAndApp =
Maps.newConcurrentMap();
- private Map<String, String> appIdToUser = Maps.newConcurrentMap();
+ private Map<String, Map<String, Long>> currentUserAndApp =
JavaUtils.newConcurrentMap();
+ private Map<String, String> appIdToUser = JavaUtils.newConcurrentMap();
private QuotaManager quotaManager;
// it's only for test case to check if status check has problem
private boolean hasErrorInStatusCheck = false;
public ApplicationManager(CoordinatorConf conf) {
storageStrategy =
conf.get(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY);
- appIdToRemoteStorageInfo = Maps.newConcurrentMap();
- remoteStoragePathRankValue = Maps.newConcurrentMap();
- availableRemoteStorageInfo = Maps.newConcurrentMap();
+ appIdToRemoteStorageInfo = JavaUtils.newConcurrentMap();
+ remoteStoragePathRankValue = JavaUtils.newConcurrentMap();
+ availableRemoteStorageInfo = JavaUtils.newConcurrentMap();
if (StrategyName.IO_SAMPLE == storageStrategy) {
selectStorageStrategy = new
LowestIOSampleCostSelectStorageStrategy(remoteStoragePathRankValue,
appIdToRemoteStorageInfo, availableRemoteStorageInfo, conf);
@@ -110,7 +111,7 @@ public class ApplicationManager implements Closeable {
public void registerApplicationInfo(String appId, String user) {
// using computeIfAbsent is just for MR and spark which is used
RssShuffleManager as implementation class
// in such case by default, there is no currentUserAndApp, so a unified
user implementation named "user" is used.
- Map<String, Long> appAndTime = currentUserAndApp.computeIfAbsent(user, x
-> Maps.newConcurrentMap());
+ Map<String, Long> appAndTime = currentUserAndApp.computeIfAbsent(user, x
-> JavaUtils.newConcurrentMap());
appIdToUser.put(appId, user);
if (!appAndTime.containsKey(appId)) {
CoordinatorMetrics.counterTotalAppNum.inc();
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java
index 1c3959eb..83bf6cab 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import com.google.common.collect.Maps;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
@@ -39,12 +38,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.ThreadUtils;
public class ClientConfManager implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(ClientConfManager.class);
- private Map<String, String> clientConf = Maps.newConcurrentMap();
+ private Map<String, String> clientConf = JavaUtils.newConcurrentMap();
private final AtomicLong lastCandidatesUpdateMS = new AtomicLong(0L);
private Path path;
private ScheduledExecutorService updateClientConfSES = null;
@@ -100,7 +100,7 @@ public class ClientConfManager implements Closeable {
}
private void updateClientConfInternal() {
- Map<String, String> newClientConf = Maps.newConcurrentMap();
+ Map<String, String> newClientConf = JavaUtils.newConcurrentMap();
String content = loadClientConfContent();
if (StringUtils.isEmpty(content)) {
clientConf = newClientConf;
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
index 297910f1..6e39c59d 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
@@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -39,6 +38,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
@@ -47,13 +47,13 @@ import
org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
*/
public class QuotaManager {
private static final Logger LOG =
LoggerFactory.getLogger(QuotaManager.class);
- private final Map<String, Map<String, Long>> currentUserAndApp =
Maps.newConcurrentMap();
- private final Map<String, String> appIdToUser = Maps.newConcurrentMap();
+ private final Map<String, Map<String, Long>> currentUserAndApp =
JavaUtils.newConcurrentMap();
+ private final Map<String, String> appIdToUser = JavaUtils.newConcurrentMap();
private final String quotaFilePath;
private final Integer quotaAppNum;
private FileSystem hadoopFileSystem;
private final AtomicLong quotaFileLastModify = new AtomicLong(0L);
- private final Map<String, Integer> defaultUserApps = Maps.newConcurrentMap();
+ private final Map<String, Integer> defaultUserApps =
JavaUtils.newConcurrentMap();
public QuotaManager(CoordinatorConf conf) {
this.quotaFilePath =
conf.get(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH);
@@ -117,7 +117,7 @@ public class QuotaManager {
}
public boolean checkQuota(String user, String uuid) {
- Map<String, Long> appAndTimes = currentUserAndApp.computeIfAbsent(user, x
-> Maps.newConcurrentMap());
+ Map<String, Long> appAndTimes = currentUserAndApp.computeIfAbsent(user, x
-> JavaUtils.newConcurrentMap());
Integer defaultAppNum = defaultUserApps.getOrDefault(user, quotaAppNum);
synchronized (this) {
int currentAppNum = appAndTimes.size();
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index 4e5e4597..a059428d 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -37,7 +37,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -55,6 +54,7 @@ import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.InvalidRequestException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
@@ -62,11 +62,11 @@ public class SimpleClusterManager implements ClusterManager
{
private static final Logger LOG =
LoggerFactory.getLogger(SimpleClusterManager.class);
- private final Map<String, ServerNode> servers = Maps.newConcurrentMap();
+ private final Map<String, ServerNode> servers = JavaUtils.newConcurrentMap();
private final Cache<ServerNode, ShuffleServerInternalGrpcClient> clientCache;
private Set<String> excludeNodes = Sets.newConcurrentHashSet();
// tag -> nodes
- private Map<String, Set<ServerNode>> tagToNodes = Maps.newConcurrentMap();
+ private Map<String, Set<ServerNode>> tagToNodes =
JavaUtils.newConcurrentMap();
private AtomicLong excludeLastModify = new AtomicLong(0L);
private long heartbeatTimeout;
private volatile int shuffleNodesMax;
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
index b210ca11..f06a339b 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
@@ -20,13 +20,13 @@ package org.apache.uniffle.coordinator.metric;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import org.apache.commons.lang3.StringUtils;
import org.apache.uniffle.common.metrics.MetricsManager;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
public class CoordinatorMetrics {
@@ -54,7 +54,7 @@ public class CoordinatorMetrics {
public static Counter counterTotalCandidatesDeniedRequest;
public static Counter counterTotalQuotaDeniedRequest;
public static Counter counterTotalLoadDeniedRequest;
- public static final Map<String, Gauge> GAUGE_USED_REMOTE_STORAGE =
Maps.newConcurrentMap();
+ public static final Map<String, Gauge> GAUGE_USED_REMOTE_STORAGE =
JavaUtils.newConcurrentMap();
private static MetricsManager metricsManager;
private static boolean isRegister = false;
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
index ce6a577e..38b95714 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
@@ -25,12 +25,12 @@ import java.util.Set;
import java.util.SortedMap;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.coordinator.ClusterManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.ServerNode;
@@ -58,7 +58,7 @@ public class PartitionBalanceAssignmentStrategy extends
AbstractAssignmentStrate
private static final Logger LOG =
LoggerFactory.getLogger(PartitionBalanceAssignmentStrategy.class);
private ClusterManager clusterManager;
- private Map<ServerNode, PartitionAssignmentInfo> serverToPartitions =
Maps.newConcurrentMap();
+ private Map<ServerNode, PartitionAssignmentInfo> serverToPartitions =
JavaUtils.newConcurrentMap();
public PartitionBalanceAssignmentStrategy(ClusterManager clusterManager,
CoordinatorConf conf) {
super(conf);
@@ -81,7 +81,7 @@ public class PartitionBalanceAssignmentStrategy extends
AbstractAssignmentStrate
SortedMap<PartitionRange, List<ServerNode>> assignments;
synchronized (this) {
List<ServerNode> nodes = clusterManager.getServerList(requiredTags);
- Map<ServerNode, PartitionAssignmentInfo> newPartitionInfos =
Maps.newConcurrentMap();
+ Map<ServerNode, PartitionAssignmentInfo> newPartitionInfos =
JavaUtils.newConcurrentMap();
for (ServerNode node : nodes) {
PartitionAssignmentInfo partitionInfo;
if (serverToPartitions.containsKey(node)) {
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index 123dca4d..19eec9bd 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -27,7 +27,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.AfterEach;
@@ -39,6 +38,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ServerStatus;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
import static org.awaitility.Awaitility.await;
@@ -136,11 +136,11 @@ public class SimpleClusterManagerTest {
try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc,
new Configuration())) {
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
- 10, nettyTags, true, ServerStatus.ACTIVE, Maps.newConcurrentMap(),
1);
+ 10, nettyTags, true, ServerStatus.ACTIVE,
JavaUtils.newConcurrentMap(), 1);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
- 10, nettyTags, true, ServerStatus.ACTIVE, Maps.newConcurrentMap(),
1);
+ 10, nettyTags, true, ServerStatus.ACTIVE,
JavaUtils.newConcurrentMap(), 1);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
- 11, nettyTags, true, ServerStatus.ACTIVE, Maps.newConcurrentMap(),
1);
+ 11, nettyTags, true, ServerStatus.ACTIVE,
JavaUtils.newConcurrentMap(), 1);
ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 50L, 20,
11, grpcTags, true);
clusterManager.add(sn1);
@@ -162,10 +162,10 @@ public class SimpleClusterManagerTest {
// tag changes
sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
- 10, Sets.newHashSet("new_tag"), true, ServerStatus.ACTIVE,
Maps.newConcurrentMap(), 1);
+ 10, Sets.newHashSet("new_tag"), true, ServerStatus.ACTIVE,
JavaUtils.newConcurrentMap(), 1);
sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
10, Sets.newHashSet("test", "new_tag"),
- true, ServerStatus.ACTIVE, Maps.newConcurrentMap(), 1);
+ true, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
sn4 = new ServerNode("sn4", "ip", 0, 100L, 51L, 20,
10, grpcTags, true);
clusterManager.add(sn1);
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
index 6a7553ae..2aea4d36 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
@@ -46,6 +46,7 @@ import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.MockedGrpcServer;
import org.apache.uniffle.server.MockedShuffleServerGrpcService;
@@ -201,7 +202,7 @@ public class GetShuffleReportForMultiPartTest extends
SparkIntegrationTestBase {
public static class RssShuffleManagerWrapper extends RssShuffleManager {
// shuffleId -> partShouldRequestNum
- Map<Integer, AtomicInteger> shuffleToPartShouldRequestNum =
Maps.newConcurrentMap();
+ Map<Integer, AtomicInteger> shuffleToPartShouldRequestNum =
JavaUtils.newConcurrentMap();
public RssShuffleManagerWrapper(SparkConf conf, boolean isDriver) {
super(conf, isDriver);
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
index 060f6626..cc0896ac 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
@@ -19,19 +19,18 @@ package org.apache.uniffle.client.factory;
import java.util.Map;
-import com.google.common.collect.Maps;
-
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.JavaUtils;
public class ShuffleServerClientFactory {
private Map<String, Map<ShuffleServerInfo, ShuffleServerClient>> clients;
private ShuffleServerClientFactory() {
- clients = Maps.newConcurrentMap();
+ clients = JavaUtils.newConcurrentMap();
}
private static class LazyHolder {
@@ -52,7 +51,7 @@ public class ShuffleServerClientFactory {
public synchronized ShuffleServerClient getShuffleServerClient(
String clientType, ShuffleServerInfo shuffleServerInfo) {
- clients.putIfAbsent(clientType, Maps.newConcurrentMap());
+ clients.putIfAbsent(clientType, JavaUtils.newConcurrentMap());
Map<ShuffleServerInfo, ShuffleServerClient> serverToClients =
clients.get(clientType);
if (serverToClients.get(shuffleServerInfo) == null) {
serverToClients.put(shuffleServerInfo,
createShuffleServerClient(clientType, shuffleServerInfo));
@@ -63,6 +62,6 @@ public class ShuffleServerClientFactory {
// Only for tests
public synchronized void cleanupCache() {
clients.values().stream().flatMap(x ->
x.values().stream()).forEach(ShuffleServerClient::close);
- this.clients = Maps.newConcurrentMap();
+ this.clients = JavaUtils.newConcurrentMap();
}
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 7c5b2df6..b81453cf 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang3.StringUtils;
@@ -40,6 +39,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.server.storage.MultiStorageManager;
@@ -62,7 +62,7 @@ public class ShuffleFlushManager {
private final ShuffleServerConf shuffleServerConf;
private Configuration hadoopConf;
// appId -> shuffleId -> committed shuffle blockIds
- private Map<String, Map<Integer, Roaring64NavigableMap>> committedBlockIds =
Maps.newConcurrentMap();
+ private Map<String, Map<Integer, Roaring64NavigableMap>> committedBlockIds =
JavaUtils.newConcurrentMap();
private final int retryMax;
private final StorageManager storageManager;
@@ -265,7 +265,7 @@ public class ShuffleFlushManager {
return;
}
if (!committedBlockIds.containsKey(appId)) {
- committedBlockIds.putIfAbsent(appId, Maps.newConcurrentMap());
+ committedBlockIds.putIfAbsent(appId, JavaUtils.newConcurrentMap());
}
Map<Integer, Roaring64NavigableMap> shuffleToBlockIds =
committedBlockIds.get(appId);
shuffleToBlockIds.putIfAbsent(shuffleId, Roaring64NavigableMap.bitmapOf());
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index f2d9f3ed..098560d1 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -20,13 +20,13 @@ package org.apache.uniffle.server;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import org.apache.commons.lang3.StringUtils;
import org.apache.uniffle.common.metrics.MetricsManager;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.storage.common.LocalStorage;
@@ -158,10 +158,10 @@ public class ShuffleServerMetrics {
public static synchronized void register(CollectorRegistry
collectorRegistry) {
if (!isRegister) {
- counterRemoteStorageTotalWrite = Maps.newConcurrentMap();
- counterRemoteStorageRetryWrite = Maps.newConcurrentMap();
- counterRemoteStorageFailedWrite = Maps.newConcurrentMap();
- counterRemoteStorageSuccessWrite = Maps.newConcurrentMap();
+ counterRemoteStorageTotalWrite = JavaUtils.newConcurrentMap();
+ counterRemoteStorageRetryWrite = JavaUtils.newConcurrentMap();
+ counterRemoteStorageFailedWrite = JavaUtils.newConcurrentMap();
+ counterRemoteStorageSuccessWrite = JavaUtils.newConcurrentMap();
metricsManager = new MetricsManager(collectorRegistry);
isRegister = true;
setUpMetrics();
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index dc76e5f8..0cbfaa7a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -24,13 +24,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.util.JavaUtils;
/**
* ShuffleTaskInfo contains the information of submitting the shuffle,
@@ -68,13 +68,13 @@ public class ShuffleTaskInfo {
public ShuffleTaskInfo(String appId) {
this.appId = appId;
this.currentTimes = System.currentTimeMillis();
- this.commitCounts = Maps.newConcurrentMap();
- this.commitLocks = Maps.newConcurrentMap();
- this.cachedBlockIds = Maps.newConcurrentMap();
+ this.commitCounts = JavaUtils.newConcurrentMap();
+ this.commitLocks = JavaUtils.newConcurrentMap();
+ this.cachedBlockIds = JavaUtils.newConcurrentMap();
this.user = new AtomicReference<>();
this.dataDistType = new AtomicReference<>();
- this.partitionDataSizes = Maps.newConcurrentMap();
- this.hugePartitionTags = Maps.newConcurrentMap();
+ this.partitionDataSizes = JavaUtils.newConcurrentMap();
+ this.hugePartitionTags = JavaUtils.newConcurrentMap();
this.existHugePartition = new AtomicBoolean(false);
}
@@ -117,7 +117,7 @@ public class ShuffleTaskInfo {
public long addPartitionDataSize(int shuffleId, int partitionId, long delta)
{
totalDataSize.addAndGet(delta);
- partitionDataSizes.computeIfAbsent(shuffleId, key ->
Maps.newConcurrentMap());
+ partitionDataSizes.computeIfAbsent(shuffleId, key ->
JavaUtils.newConcurrentMap());
Map<Integer, Long> partitions = partitionDataSizes.get(shuffleId);
partitions.putIfAbsent(partitionId, 0L);
return partitions.computeIfPresent(partitionId, (k, v) -> v + delta);
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 6718062e..d0103905 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -55,6 +55,7 @@ import
org.apache.uniffle.common.exception.FileNotFoundException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
@@ -92,8 +93,8 @@ public class ShuffleTaskManager {
// but when get blockId, performance will degrade a little which can be
optimized by client configuration
private Map<String, Map<Integer, Roaring64NavigableMap[]>>
partitionsToBlockIds;
private final ShuffleBufferManager shuffleBufferManager;
- private Map<String, ShuffleTaskInfo> shuffleTaskInfos =
Maps.newConcurrentMap();
- private Map<Long, PreAllocatedBufferInfo> requireBufferIds =
Maps.newConcurrentMap();
+ private Map<String, ShuffleTaskInfo> shuffleTaskInfos =
JavaUtils.newConcurrentMap();
+ private Map<Long, PreAllocatedBufferInfo> requireBufferIds =
JavaUtils.newConcurrentMap();
private Runnable clearResourceThread;
private BlockingQueue<PurgeEvent> expiredAppIdQueue =
Queues.newLinkedBlockingQueue();
@@ -104,7 +105,7 @@ public class ShuffleTaskManager {
StorageManager storageManager) {
this.conf = conf;
this.shuffleFlushManager = shuffleFlushManager;
- this.partitionsToBlockIds = Maps.newConcurrentMap();
+ this.partitionsToBlockIds = JavaUtils.newConcurrentMap();
this.shuffleBufferManager = shuffleBufferManager;
this.storageManager = storageManager;
this.appExpiredWithoutHB =
conf.getLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT);
@@ -191,7 +192,7 @@ public class ShuffleTaskManager {
refreshAppId(appId);
shuffleTaskInfos.get(appId).setUser(user);
shuffleTaskInfos.get(appId).setDataDistType(dataDistType);
- partitionsToBlockIds.putIfAbsent(appId, Maps.newConcurrentMap());
+ partitionsToBlockIds.putIfAbsent(appId, JavaUtils.newConcurrentMap());
for (PartitionRange partitionRange : partitionRanges) {
shuffleBufferManager.registerBuffer(appId, shuffleId,
partitionRange.getStart(), partitionRange.getEnd());
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index dc305c67..78d76b38 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -25,7 +25,6 @@ import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +35,7 @@ import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleFlushManager;
@@ -55,7 +55,7 @@ public class ShuffleBuffer {
this.capacity = capacity;
this.size = 0;
this.blocks = new LinkedList<>();
- this.inFlushBlockMap = Maps.newConcurrentMap();
+ this.inFlushBlockMap = JavaUtils.newConcurrentMap();
}
public long append(ShufflePartitionedData data) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index a69a28e1..316ae3a3 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -40,6 +40,7 @@ import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleFlushManager;
@@ -75,7 +76,7 @@ public class ShuffleBufferManager {
// appId -> shuffleId -> partitionId -> ShuffleBuffer to avoid too many appId
protected Map<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>>
bufferPool;
// appId -> shuffleId -> shuffle size in buffer
- protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap =
Maps.newConcurrentMap();
+ protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap =
JavaUtils.newConcurrentMap();
public ShuffleBufferManager(ShuffleServerConf conf, ShuffleFlushManager
shuffleFlushManager) {
long heapSize = Runtime.getRuntime().maxMemory();
@@ -109,7 +110,7 @@ public class ShuffleBufferManager {
}
public StatusCode registerBuffer(String appId, int shuffleId, int
startPartition, int endPartition) {
- bufferPool.putIfAbsent(appId, Maps.newConcurrentMap());
+ bufferPool.putIfAbsent(appId, JavaUtils.newConcurrentMap());
Map<Integer, RangeMap<Integer, ShuffleBuffer>> shuffleIdToBuffers =
bufferPool.get(appId);
shuffleIdToBuffers.putIfAbsent(shuffleId, TreeRangeMap.create());
RangeMap<Integer, ShuffleBuffer> bufferRangeMap =
shuffleIdToBuffers.get(shuffleId);
@@ -162,7 +163,7 @@ public class ShuffleBufferManager {
}
private void updateShuffleSize(String appId, int shuffleId, long size) {
- shuffleSizeMap.putIfAbsent(appId, Maps.newConcurrentMap());
+ shuffleSizeMap.putIfAbsent(appId, JavaUtils.newConcurrentMap());
Map<Integer, AtomicLong> shuffleIdToSize = shuffleSizeMap.get(appId);
shuffleIdToSize.putIfAbsent(shuffleId, new AtomicLong(0));
shuffleIdToSize.get(shuffleId).addAndGet(size);
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index eefc0e5a..7fddb86a 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -36,6 +36,7 @@ import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.server.Checker;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleDataReadEvent;
@@ -56,8 +57,8 @@ public class HdfsStorageManager extends SingleStorageManager {
private static final Logger LOG =
LoggerFactory.getLogger(HdfsStorageManager.class);
private final Configuration hadoopConf;
- private Map<String, HdfsStorage> appIdToStorages = Maps.newConcurrentMap();
- private Map<String, HdfsStorage> pathToStorages = Maps.newConcurrentMap();
+ private Map<String, HdfsStorage> appIdToStorages =
JavaUtils.newConcurrentMap();
+ private Map<String, HdfsStorage> pathToStorages =
JavaUtils.newConcurrentMap();
HdfsStorageManager(ShuffleServerConf conf) {
super(conf);
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 84ccf173..96649b68 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -51,6 +51,7 @@ import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.storage.StorageMedia;
import org.apache.uniffle.common.storage.StorageStatus;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.server.Checker;
import org.apache.uniffle.server.LocalStorageChecker;
@@ -90,7 +91,7 @@ public class LocalStorageManager extends SingleStorageManager
{
if (CollectionUtils.isEmpty(storageBasePaths)) {
throw new IllegalArgumentException("Base path dirs must not be empty");
}
- this.partitionsOfStorage = Maps.newConcurrentMap();
+ this.partitionsOfStorage = JavaUtils.newConcurrentMap();
long capacity = conf.getSizeAsBytes(ShuffleServerConf.DISK_CAPACITY);
double ratio = conf.getDouble(ShuffleServerConf.DISK_CAPACITY_RATIO);
double highWaterMarkOfWrite =
conf.get(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE);
diff --git
a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
index a603ff62..95b814bd 100644
---
a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
+++
b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
@@ -22,12 +22,12 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.proto.RssProtos;
@@ -36,7 +36,7 @@ public class MockedShuffleServerGrpcService extends
ShuffleServerGrpcService {
private static final Logger LOG =
LoggerFactory.getLogger(MockedShuffleServerGrpcService.class);
// appId -> shuffleId -> partitionRequestNum
- private Map<String, Map<Integer, AtomicInteger>> appToPartitionRequest =
Maps.newConcurrentMap();
+ private Map<String, Map<Integer, AtomicInteger>> appToPartitionRequest =
JavaUtils.newConcurrentMap();
private long mockedTimeout = -1L;
@@ -98,7 +98,7 @@ public class MockedShuffleServerGrpcService extends
ShuffleServerGrpcService {
if (recordGetShuffleResult) {
List<Integer> requestPartitions = request.getPartitionsList();
Map<Integer, AtomicInteger> shuffleIdToPartitionRequestNum =
appToPartitionRequest.computeIfAbsent(
- request.getAppId(), x -> Maps.newConcurrentMap());
+ request.getAppId(), x -> JavaUtils.newConcurrentMap());
AtomicInteger partitionRequestNum =
shuffleIdToPartitionRequestNum.computeIfAbsent(
request.getShuffleId(), x -> new AtomicInteger(0));
partitionRequestNum.addAndGet(requestPartitions.size());
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
b/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
index d2c7884d..641dcb26 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
@@ -20,8 +20,8 @@ package org.apache.uniffle.storage.common;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.storage.handler.api.ServerReadHandler;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
@@ -31,16 +31,16 @@ import org.apache.uniffle.storage.util.ShuffleStorageUtils;
public abstract class AbstractStorage implements Storage {
- private Map<String, Map<String, ShuffleWriteHandler>> writerHandlers =
Maps.newConcurrentMap();
- private Map<String, Map<String, CreateShuffleWriteHandlerRequest>> requests
= Maps.newConcurrentMap();
- private Map<String, Map<String, ServerReadHandler>> readerHandlers =
Maps.newConcurrentMap();
+ private Map<String, Map<String, ShuffleWriteHandler>> writerHandlers =
JavaUtils.newConcurrentMap();
+ private Map<String, Map<String, CreateShuffleWriteHandlerRequest>> requests
= JavaUtils.newConcurrentMap();
+ private Map<String, Map<String, ServerReadHandler>> readerHandlers =
JavaUtils.newConcurrentMap();
abstract ShuffleWriteHandler
newWriteHandler(CreateShuffleWriteHandlerRequest request);
@Override
public ShuffleWriteHandler
getOrCreateWriteHandler(CreateShuffleWriteHandlerRequest request) {
- writerHandlers.computeIfAbsent(request.getAppId(), key ->
Maps.newConcurrentMap());
- requests.computeIfAbsent(request.getAppId(), key ->
Maps.newConcurrentMap());
+ writerHandlers.computeIfAbsent(request.getAppId(), key ->
JavaUtils.newConcurrentMap());
+ requests.computeIfAbsent(request.getAppId(), key ->
JavaUtils.newConcurrentMap());
Map<String, ShuffleWriteHandler> map =
writerHandlers.get(request.getAppId());
String partitionKey = RssUtils.generatePartitionKey(
request.getAppId(),
@@ -55,7 +55,7 @@ public abstract class AbstractStorage implements Storage {
@Override
public ServerReadHandler
getOrCreateReadHandler(CreateShuffleReadHandlerRequest request) {
- readerHandlers.computeIfAbsent(request.getAppId(), key ->
Maps.newConcurrentMap());
+ readerHandlers.computeIfAbsent(request.getAppId(), key ->
JavaUtils.newConcurrentMap());
Map<String, ServerReadHandler> map =
readerHandlers.get(request.getAppId());
int[] range = ShuffleStorageUtils.getPartitionRange(
request.getPartitionId(),
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
index 71b4185d..e67cd87b 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
@@ -28,11 +28,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.util.JavaUtils;
+
/**
* Metadata has three dimensions from top to down including disk, shuffle,
partition.
* And each dimension contains two aspects, status data and indicator data.
@@ -44,7 +45,7 @@ public class LocalStorageMeta {
private static final Logger LOG =
LoggerFactory.getLogger(LocalStorageMeta.class);
private final AtomicLong size = new AtomicLong(0L);
- private final Map<String, ShuffleMeta> shuffleMetaMap =
Maps.newConcurrentMap();
+ private final Map<String, ShuffleMeta> shuffleMetaMap =
JavaUtils.newConcurrentMap();
// todo: add ut
public List<String> getSortedShuffleKeys(boolean checkRead, int hint) {