This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new ed9c775ee [ISSUE #4999] Optimize ConcurrentMap#computeIfAbsent
performance on jdk1.8 a workaround (#5008)
ed9c775ee is described below
commit ed9c775ee371eb6cbca9915f98f44f63cccc7f8b
Author: mxsm <[email protected]>
AuthorDate: Thu Sep 8 20:50:32 2022 +0800
[ISSUE #4999] Optimize ConcurrentMap#computeIfAbsent performance on jdk1.8
a workaround (#5008)
---
.../common/utils/ConcurrentHashMapUtils.java | 21 ++++---
.../common/utils/ConcurrentHashMapUtilsTest.java | 39 +++++++++++++
.../namesrv/routeinfo/RouteInfoManager.java | 65 +++++++++++-----------
.../rocketmq/proxy/common/ReceiptHandleGroup.java | 4 +-
.../proxy/processor/ReceiptHandleProcessor.java | 3 +-
.../proxy/service/channel/ChannelManager.java | 4 +-
.../store/ha/autoswitch/AutoSwitchHAService.java | 3 +-
.../rocketmq/store/queue/QueueOffsetAssigner.java | 8 +--
8 files changed, 95 insertions(+), 52 deletions(-)
diff --git
a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ConcurrentHashMapUtil.java
b/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java
similarity index 77%
rename from
srvutil/src/main/java/org/apache/rocketmq/srvutil/ConcurrentHashMapUtil.java
rename to
common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java
index cc98eb5bc..b994276c9 100644
---
a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ConcurrentHashMapUtil.java
+++
b/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java
@@ -14,23 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.srvutil;
+package org.apache.rocketmq.common.utils;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
-public class ConcurrentHashMapUtil {
+public abstract class ConcurrentHashMapUtils {
+
private static final boolean IS_JDK8;
static {
- // Java 8 or lower: 1.6.0_23, 1.7.0, 1.7.0_80, 1.8.0_211
- // Java 9 or higher: 9.0.1, 11.0.4, 12, 12.0.1
+ // Java 8
+ // Java 9+: 9,11,17
IS_JDK8 = System.getProperty("java.version").startsWith("1.8.");
}
- private ConcurrentHashMapUtil() {
- }
-
/**
* A temporary workaround for Java 8 specific performance issue
JDK-8161372 .<br> Use implementation of
* ConcurrentMap.computeIfAbsent instead.
@@ -39,10 +37,11 @@ public class ConcurrentHashMapUtil {
*/
public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key,
Function<? super K, ? extends V> func) {
if (IS_JDK8) {
- V v, newValue;
- return ((v = map.get(key)) == null &&
- (newValue = func.apply(key)) != null &&
- (v = map.putIfAbsent(key, newValue)) == null) ? newValue : v;
+ V v = map.get(key);
+ if (null == v) {
+ v = map.computeIfAbsent(key, func);
+ }
+ return v;
} else {
return map.computeIfAbsent(key, func);
}
diff --git
a/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java
b/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java
new file mode 100644
index 000000000..89a4b0cda
--- /dev/null
+++
b/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.utils;
+
+import java.util.concurrent.ConcurrentHashMap;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class ConcurrentHashMapUtilsTest {
+
+ @Test
+ public void computeIfAbsent() {
+
+ ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
+ map.put("123", "1111");
+ String value = ConcurrentHashMapUtils.computeIfAbsent(map, "123", k ->
"234");
+ assertEquals("1111", value);
+ String value1 = ConcurrentHashMapUtils.computeIfAbsent(map, "1232", k
-> "2342");
+ assertEquals("2342", value1);
+ String value2 = ConcurrentHashMapUtils.computeIfAbsent(map, "123", k
-> "2342");
+ assertEquals("1111", value2);
+ }
+}
\ No newline at end of file
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index ace963ca9..91ec4ca1f 100644
---
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -47,6 +47,7 @@ import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.RequestCode;
import
org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
@@ -201,16 +202,16 @@ public class RouteInfoManager {
}
public RegisterBrokerResult registerBroker(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId,
- final String haServerAddr,
- final String zoneName,
- final Long timeoutMillis,
- final TopicConfigSerializeWrapper topicConfigWrapper,
- final List<String> filterServerList,
- final Channel channel) {
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId,
+ final String haServerAddr,
+ final String zoneName,
+ final Long timeoutMillis,
+ final TopicConfigSerializeWrapper topicConfigWrapper,
+ final List<String> filterServerList,
+ final Channel channel) {
return registerBroker(clusterName, brokerAddr, brokerName, brokerId,
haServerAddr, zoneName, timeoutMillis, false, topicConfigWrapper,
filterServerList, channel);
}
@@ -231,7 +232,7 @@ public class RouteInfoManager {
this.lock.writeLock().lockInterruptibly();
//init or update the cluster info
- Set<String> brokerNames =
this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());
+ Set<String> brokerNames =
ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>)
this.clusterAddrTable, clusterName, k -> new HashSet<>());
brokerNames.add(brokerName);
boolean registerFirst = false;
@@ -273,8 +274,8 @@ public class RouteInfoManager {
long newStateVersion =
topicConfigWrapper.getDataVersion().getStateVersion();
if (oldStateVersion > newStateVersion) {
log.warn("Registered Broker conflicts with the existed
one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
- "Old BrokerAddr:{}, Old Version:{},
New BrokerAddr:{}, New Version:{}.",
- clusterName, brokerName, brokerId,
oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
+ "Old BrokerAddr:{}, Old Version:{}, New
BrokerAddr:{}, New Version:{}.",
+ clusterName, brokerName, brokerId, oldBrokerAddr,
oldStateVersion, brokerAddr, newStateVersion);
//Remove the rejected brokerAddr from brokerLiveTable.
brokerLiveTable.remove(new BrokerAddrInfo(clusterName,
brokerAddr));
return result;
@@ -284,7 +285,7 @@ public class RouteInfoManager {
if (!brokerAddrsMap.containsKey(brokerId) &&
topicConfigWrapper.getTopicConfigTable().size() == 1) {
log.warn("Can't register topicConfigWrapper={} because
broker[{}]={} has not registered.",
- topicConfigWrapper.getTopicConfigTable(), brokerId,
brokerAddr);
+ topicConfigWrapper.getTopicConfigTable(), brokerId,
brokerAddr);
return null;
}
@@ -293,17 +294,17 @@ public class RouteInfoManager {
boolean isMaster = MixAll.MASTER_ID == brokerId;
boolean isPrimeSlave = !isOldVersionBroker && !isMaster
- && brokerId == Collections.min(brokerAddrsMap.keySet());
+ && brokerId == Collections.min(brokerAddrsMap.keySet());
if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {
ConcurrentMap<String, TopicConfig> tcTable =
- topicConfigWrapper.getTopicConfigTable();
+ topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry :
tcTable.entrySet()) {
if (registerFirst ||
this.isTopicConfigChanged(clusterName, brokerAddr,
- topicConfigWrapper.getDataVersion(),
brokerName,
- entry.getValue().getTopicName())) {
+ topicConfigWrapper.getDataVersion(), brokerName,
+ entry.getValue().getTopicName())) {
final TopicConfig topicConfig = entry.getValue();
if (isPrimeSlave) {
// Wipe write perm for prime slave
@@ -331,12 +332,12 @@ public class RouteInfoManager {
BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName,
brokerAddr);
BrokerLiveInfo prevBrokerLiveInfo =
this.brokerLiveTable.put(brokerAddrInfo,
- new BrokerLiveInfo(
- System.currentTimeMillis(),
- timeoutMillis == null ?
DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
- topicConfigWrapper == null ? new DataVersion() :
topicConfigWrapper.getDataVersion(),
- channel,
- haServerAddr));
+ new BrokerLiveInfo(
+ System.currentTimeMillis(),
+ timeoutMillis == null ?
DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
+ topicConfigWrapper == null ? new DataVersion() :
topicConfigWrapper.getDataVersion(),
+ channel,
+ haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAService: {}",
brokerAddrInfo, haServerAddr);
}
@@ -363,7 +364,7 @@ public class RouteInfoManager {
if (isMinBrokerIdChanged &&
namesrvConfig.isNotifyMinBrokerIdChanged()) {
notifyMinBrokerIdChanged(brokerAddrsMap, null,
-
this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
+
this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
@@ -679,9 +680,9 @@ public class RouteInfoManager {
continue;
}
BrokerData brokerDataClone = new
BrokerData(brokerData.getCluster(),
- brokerData.getBrokerName(),
- (HashMap<Long, String>)
brokerData.getBrokerAddrs().clone(),
- brokerData.isEnableActingMaster(),
brokerData.getZoneName());
+ brokerData.getBrokerName(),
+ (HashMap<Long, String>)
brokerData.getBrokerAddrs().clone(),
+ brokerData.isEnableActingMaster(),
brokerData.getZoneName());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
@@ -1023,7 +1024,7 @@ public class RouteInfoManager {
String topic = topicEntry.getKey();
Map<String, QueueData> queueDatas = topicEntry.getValue();
if (queueDatas != null && queueDatas.size() > 0
- &&
TopicSysFlag.hasUnitFlag(queueDatas.values().iterator().next().getTopicSysFlag()))
{
+ &&
TopicSysFlag.hasUnitFlag(queueDatas.values().iterator().next().getTopicSysFlag()))
{
topicList.getTopicList().add(topic);
}
}
@@ -1044,7 +1045,7 @@ public class RouteInfoManager {
String topic = topicEntry.getKey();
Map<String, QueueData> queueDatas = topicEntry.getValue();
if (queueDatas != null && queueDatas.size() > 0
- &&
TopicSysFlag.hasUnitSubFlag(queueDatas.values().iterator().next().getTopicSysFlag()))
{
+ &&
TopicSysFlag.hasUnitSubFlag(queueDatas.values().iterator().next().getTopicSysFlag()))
{
topicList.getTopicList().add(topic);
}
}
@@ -1065,8 +1066,8 @@ public class RouteInfoManager {
String topic = topicEntry.getKey();
Map<String, QueueData> queueDatas = topicEntry.getValue();
if (queueDatas != null && queueDatas.size() > 0
- &&
!TopicSysFlag.hasUnitFlag(queueDatas.values().iterator().next().getTopicSysFlag())
- &&
TopicSysFlag.hasUnitSubFlag(queueDatas.values().iterator().next().getTopicSysFlag()))
{
+ &&
!TopicSysFlag.hasUnitFlag(queueDatas.values().iterator().next().getTopicSysFlag())
+ &&
TopicSysFlag.hasUnitSubFlag(queueDatas.values().iterator().next().getTopicSysFlag()))
{
topicList.getTopicList().add(topic);
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
index d2f447273..07d32445f 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
public class ReceiptHandleGroup {
@@ -74,7 +75,8 @@ public class ReceiptHandleGroup {
public void put(String msgID, String handle, MessageReceiptHandle value) {
long timeout =
ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
- Map<String, HandleData> handleMap =
receiptHandleMap.computeIfAbsent(msgID, msgIDKey -> new ConcurrentHashMap<>());
+ Map<String, HandleData> handleMap =
ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Map<String,
HandleData>>) this.receiptHandleMap,
+ msgID, msgIDKey -> new ConcurrentHashMap<>());
handleMap.compute(handle, (handleKey, handleData) -> {
if (handleData == null || handleData.needRemove) {
return new HandleData(value);
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 9bb8b7f9f..15c4385fd 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.subscription.RetryPolicy;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.proxy.common.AbstractStartAndShutdown;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
@@ -239,7 +240,7 @@ public class ReceiptHandleProcessor extends
AbstractStartAndShutdown {
if (key == null) {
return;
}
- receiptHandleGroupMap.computeIfAbsent(key,
+ ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, key,
k -> new ReceiptHandleGroup()).put(msgID, receiptHandle,
messageReceiptHandle);
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/ChannelManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/ChannelManager.java
index d730d9118..283cd823d 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/ChannelManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/ChannelManager.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,8 +38,7 @@ public class ChannelManager {
log.warn("ClientId is unexpected null or empty");
return createChannelInner(context);
}
-
- SimpleChannel channel = clientIdChannelMap.computeIfAbsent(clientId, k
-> createChannelInner(context));
+ SimpleChannel channel =
ConcurrentHashMapUtils.computeIfAbsent(this.clientIdChannelMap,clientId, k ->
createChannelInner(context));
channel.updateLastAccessTime();
return channel;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 74de4d691..49794c28a 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.EpochEntry;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.body.HARuntimeInfo;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
@@ -237,7 +238,7 @@ public class AutoSwitchHAService extends DefaultHAService {
}
public void updateConnectionLastCaughtUpTime(final String slaveAddress,
final long lastCaughtUpTimeMs) {
- long prevTime =
this.connectionCaughtUpTimeTable.computeIfAbsent(slaveAddress, k -> 0L);
+ Long prevTime =
ConcurrentHashMapUtils.computeIfAbsent(this.connectionCaughtUpTimeTable,
slaveAddress, k -> 0L);
this.connectionCaughtUpTimeTable.put(slaveAddress, Math.max(prevTime,
lastCaughtUpTimeMs));
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
index 55614ccba..5e87bbc03 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
@@ -20,12 +20,12 @@ package org.apache.rocketmq.store.queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
/**
* QueueOffsetAssigner is a component for assigning offsets for queues.
- *
*/
public class QueueOffsetAssigner {
private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -35,7 +35,7 @@ public class QueueOffsetAssigner {
private ConcurrentMap<String/* topic-queueid */, Long/* offset */>
lmqTopicQueueTable = new ConcurrentHashMap<>(1024);
public long assignQueueOffset(String topicQueueKey, short messageNum) {
- long queueOffset = this.topicQueueTable.computeIfAbsent(topicQueueKey,
k -> 0L);
+ Long queueOffset =
ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k
-> 0L);
this.topicQueueTable.put(topicQueueKey, queueOffset + messageNum);
return queueOffset;
}
@@ -45,13 +45,13 @@ public class QueueOffsetAssigner {
}
public long assignBatchQueueOffset(String topicQueueKey, short messageNum)
{
- Long topicOffset =
this.batchTopicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L);
+ Long topicOffset =
ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable,
topicQueueKey, k -> 0L);
this.batchTopicQueueTable.put(topicQueueKey, topicOffset + messageNum);
return topicOffset;
}
public long assignLmqOffset(String topicQueueKey, short messageNum) {
- long topicOffset =
this.lmqTopicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L);
+ Long topicOffset =
ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, topicQueueKey,
k -> 0L);
this.lmqTopicQueueTable.put(topicQueueKey, topicOffset + messageNum);
return topicOffset;
}