This is an automated email from the ASF dual-hosted git repository.
zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 49e5c5e551 Replace ConcurrentHashMap#computeIfAbsent to avoid
potential deadlock (#15456)
49e5c5e551 is described below
commit 49e5c5e551fac97d38bc1683a7f44eb27bfd537b
Author: zrlw <[email protected]>
AuthorDate: Wed Jun 18 18:35:17 2025 +0800
Replace ConcurrentHashMap#computeIfAbsent to avoid potential deadlock
(#15456)
---
.../dubbo/common/extension/ExtensionLoader.java | 7 +++--
.../dubbo/common/serialization/ClassHolder.java | 7 ++---
.../org/apache/dubbo/common/utils/PojoUtils.java | 2 +-
.../org/apache/dubbo/common/utils/UrlUtils.java | 3 +-
.../config/context/AbstractConfigManager.java | 12 ++++----
.../apache/dubbo/rpc/model/ServiceMetadata.java | 4 +--
.../org/apache/dubbo/config/ServiceConfig.java | 6 ++--
.../support/apollo/ApolloDynamicConfiguration.java | 4 ++-
.../dubbo/metadata/AbstractServiceNameMapping.java | 10 +++----
.../metadata/store/nacos/NacosMetadataReport.java | 13 +++++----
.../dubbo/metrics/data/MethodStatComposite.java | 16 +++++-----
.../apache/dubbo/metrics/data/RtStatComposite.java | 6 ++--
.../dubbo/metrics/data/ServiceStatComposite.java | 13 +++++----
.../metrics/listener/AbstractMetricsListener.java | 10 ++++---
.../apache/dubbo/metrics/model/MetricsSupport.java | 6 ++--
.../apache/dubbo/metrics/MetricsSupportTest.java | 12 ++++----
.../collector/ConfigCenterMetricsCollector.java | 6 ++--
.../collector/AggregateMetricsCollector.java | 13 ++++-----
.../collector/sample/MetricsCountSampler.java | 4 +--
.../sample/SimpleMetricsCountSampler.java | 17 ++++++-----
.../registry/collector/RegistryStatComposite.java | 10 ++++---
.../registry/client/DefaultServiceInstance.java | 7 +++--
.../registry/client/ServiceDiscoveryRegistry.java | 6 ++--
.../listener/ServiceInstancesChangedListener.java | 7 +++--
.../registry/integration/ExporterFactory.java | 6 ++--
.../registry/integration/RegistryProtocol.java | 21 ++++++-------
.../dubbo/registry/support/AbstractRegistry.java | 7 +++--
.../registry/multicast/MulticastRegistry.java | 3 +-
.../apache/dubbo/registry/nacos/NacosRegistry.java | 20 ++++++++-----
.../zookeeper/ZookeeperServiceDiscovery.java | 34 ++++++++++++----------
.../remoting/http12/message/codec/CodecUtils.java | 12 ++++----
.../rpc/protocol/dubbo/filter/TraceFilter.java | 3 +-
.../dubbo/rpc/protocol/tri/Http3Exchanger.java | 5 ++--
.../dubbo/rpc/protocol/tri/TripleInvoker.java | 10 ++++---
.../protocol/tri/h12/grpc/GrpcCompositeCodec.java | 9 ++++--
.../fastjson2/Fastjson2CreatorManager.java | 12 ++++----
36 files changed, 191 insertions(+), 152 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java
index a1f2a945b1..f95a9f19ab 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java
@@ -33,6 +33,7 @@ import org.apache.dubbo.common.utils.ArrayUtils;
import org.apache.dubbo.common.utils.ClassLoaderResourceLoader;
import org.apache.dubbo.common.utils.ClassUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.Holder;
@@ -141,7 +142,7 @@ public class ExtensionLoader<T> {
private static final Map<String, String> specialSPILoadingStrategyMap =
getSpecialSPILoadingStrategyMap();
- private static SoftReference<Map<java.net.URL, List<String>>>
urlListMapCache =
+ private static SoftReference<ConcurrentHashMap<java.net.URL,
List<String>>> urlListMapCache =
new SoftReference<>(new ConcurrentHashMap<>());
private static final List<String> ignoredInjectMethodsDesc =
getIgnoredInjectMethodsDesc();
@@ -1189,7 +1190,7 @@ public class ExtensionLoader<T> {
}
private List<String> getResourceContent(java.net.URL resourceURL) throws
IOException {
- Map<java.net.URL, List<String>> urlListMap = urlListMapCache.get();
+ ConcurrentHashMap<java.net.URL, List<String>> urlListMap =
urlListMapCache.get();
if (urlListMap == null) {
synchronized (ExtensionLoader.class) {
if ((urlListMap = urlListMapCache.get()) == null) {
@@ -1199,7 +1200,7 @@ public class ExtensionLoader<T> {
}
}
- List<String> contentList = urlListMap.computeIfAbsent(resourceURL, key
-> {
+ List<String> contentList =
ConcurrentHashMapUtils.computeIfAbsent(urlListMap, resourceURL, key -> {
List<String> newContentList = new ArrayList<>();
try (BufferedReader reader =
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/serialization/ClassHolder.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/serialization/ClassHolder.java
index 4dd61511e4..7ada36bbab 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/serialization/ClassHolder.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/serialization/ClassHolder.java
@@ -16,18 +16,17 @@
*/
package org.apache.dubbo.common.serialization;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class ClassHolder {
- private final Map<String, Set<Class<?>>> classCache = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, Set<Class<?>>> classCache = new
ConcurrentHashMap<>();
public void storeClass(Class<?> clazz) {
- classCache
- .computeIfAbsent(clazz.getName(), k -> new
ConcurrentHashSet<>())
+ ConcurrentHashMapUtils.computeIfAbsent(classCache, clazz.getName(), k
-> new ConcurrentHashSet<>())
.add(clazz);
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/PojoUtils.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/PojoUtils.java
index cdf625049b..e861c3f298 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/PojoUtils.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/PojoUtils.java
@@ -763,7 +763,7 @@ public class PojoUtils {
if (result != null) {
ConcurrentMap<String, Field> fields =
- CLASS_FIELD_CACHE.computeIfAbsent(cls, k -> new
ConcurrentHashMap<>());
+ ConcurrentHashMapUtils.computeIfAbsent(CLASS_FIELD_CACHE,
cls, k -> new ConcurrentHashMap<>());
fields.putIfAbsent(fieldName, result);
}
return result;
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java
index 56895f0da0..939945cb5d 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java
@@ -696,7 +696,8 @@ public class UrlUtils {
return Optional.ofNullable(url.getServiceModel())
.map(ServiceModel::getServiceMetadata)
.map(ServiceMetadata::getAttributeMap)
- .map(stringObjectMap -> (T)
stringObjectMap.computeIfAbsent(key, k -> fn.apply(url)))
+ .map(stringObjectMap ->
+ (T)
ConcurrentHashMapUtils.computeIfAbsent(stringObjectMap, key, k ->
fn.apply(url)))
.orElse(null);
}
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/config/context/AbstractConfigManager.java
b/dubbo-common/src/main/java/org/apache/dubbo/config/context/AbstractConfigManager.java
index cab26b6829..8c2acd8092 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/config/context/AbstractConfigManager.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/config/context/AbstractConfigManager.java
@@ -25,6 +25,7 @@ import org.apache.dubbo.common.context.LifecycleAdapter;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ReflectUtils;
import org.apache.dubbo.common.utils.StringUtils;
@@ -73,9 +74,9 @@ public abstract class AbstractConfigManager extends
LifecycleAdapter {
LoggerFactory.getErrorTypeAwareLogger(AbstractConfigManager.class);
private static final Set<Class<? extends AbstractConfig>>
uniqueConfigTypes = new ConcurrentHashSet<>();
- final Map<String, Map<String, AbstractConfig>> configsCache = new
ConcurrentHashMap<>();
+ final ConcurrentHashMap<String, Map<String, AbstractConfig>> configsCache
= new ConcurrentHashMap<>();
- private final Map<String, AtomicInteger> configIdIndexes = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, AtomicInteger> configIdIndexes =
new ConcurrentHashMap<>();
protected Set<AbstractConfig> duplicatedConfigs = new
ConcurrentHashSet<>();
@@ -163,8 +164,8 @@ public abstract class AbstractConfigManager extends
LifecycleAdapter {
Class<? extends AbstractConfig> targetConfigType =
getTargetConfigType(config.getClass());
- Map<String, AbstractConfig> configsMap =
- configsCache.computeIfAbsent(getTagName(targetConfigType),
type -> new ConcurrentHashMap<>());
+ Map<String, AbstractConfig> configsMap =
ConcurrentHashMapUtils.computeIfAbsent(
+ configsCache, getTagName(targetConfigType), type -> new
ConcurrentHashMap<>());
// fast check duplicated equivalent config before write lock
if (!(config instanceof ReferenceConfigBase || config instanceof
ServiceConfigBase)) {
@@ -408,8 +409,7 @@ public abstract class AbstractConfigManager extends
LifecycleAdapter {
protected <C extends AbstractConfig> String generateConfigId(C config) {
String tagName = getTagName(config.getClass());
- int idx = configIdIndexes
- .computeIfAbsent(tagName, clazz -> new AtomicInteger(0))
+ int idx = ConcurrentHashMapUtils.computeIfAbsent(configIdIndexes,
tagName, clazz -> new AtomicInteger(0))
.incrementAndGet();
return tagName + "#" + idx;
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceMetadata.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceMetadata.java
index 7784cbfbcd..bd95b0bda3 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceMetadata.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceMetadata.java
@@ -41,7 +41,7 @@ public class ServiceMetadata extends BaseServiceMetadata {
/**
* used locally
*/
- private final Map<String, Object> attributeMap = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, Object> attributeMap = new
ConcurrentHashMap<>();
public ServiceMetadata(String serviceInterfaceName, String group, String
version, Class<?> serviceType) {
this.serviceInterfaceName = serviceInterfaceName;
@@ -63,7 +63,7 @@ public class ServiceMetadata extends BaseServiceMetadata {
return attachments;
}
- public Map<String, Object> getAttributeMap() {
+ public ConcurrentHashMap<String, Object> getAttributeMap() {
return attributeMap;
}
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
index 01702c2705..278a12a35a 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
@@ -30,6 +30,7 @@ import
org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.url.component.ServiceConfigURL;
import org.apache.dubbo.common.utils.ClassUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
@@ -158,7 +159,7 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
/**
* The exported services
*/
- private final Map<RegisterTypeEnum, List<Exporter<?>>> exporters = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<RegisterTypeEnum, List<Exporter<?>>>
exporters = new ConcurrentHashMap<>();
private final List<ServiceListener> serviceListeners = new ArrayList<>();
@@ -969,8 +970,7 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
invoker = new DelegateProviderMetaDataInvoker(invoker, this);
}
Exporter<?> exporter = protocolSPI.export(invoker);
- exporters
- .computeIfAbsent(registerType, k -> new
CopyOnWriteArrayList<>())
+ ConcurrentHashMapUtils.computeIfAbsent(exporters, registerType, k ->
new CopyOnWriteArrayList<>())
.add(exporter);
}
diff --git
a/dubbo-configcenter/dubbo-configcenter-apollo/src/main/java/org/apache/dubbo/configcenter/support/apollo/ApolloDynamicConfiguration.java
b/dubbo-configcenter/dubbo-configcenter-apollo/src/main/java/org/apache/dubbo/configcenter/support/apollo/ApolloDynamicConfiguration.java
index e355bb22da..3b2fcf1c92 100644
---
a/dubbo-configcenter/dubbo-configcenter-apollo/src/main/java/org/apache/dubbo/configcenter/support/apollo/ApolloDynamicConfiguration.java
+++
b/dubbo-configcenter/dubbo-configcenter-apollo/src/main/java/org/apache/dubbo/configcenter/support/apollo/ApolloDynamicConfiguration.java
@@ -23,6 +23,7 @@ import
org.apache.dubbo.common.config.configcenter.ConfigurationListener;
import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.SystemPropertyConfigUtils;
import org.apache.dubbo.metrics.config.event.ConfigCenterEvent;
@@ -167,7 +168,8 @@ public class ApolloDynamicConfiguration implements
DynamicConfiguration {
*/
@Override
public void addListener(String key, String group, ConfigurationListener
listener) {
- ApolloListener apolloListener = listeners.computeIfAbsent(group + key,
k -> createTargetListener(key, group));
+ ApolloListener apolloListener =
+ ConcurrentHashMapUtils.computeIfAbsent(listeners, group + key,
k -> createTargetListener(key, group));
apolloListener.addListener(listener);
dubboConfig.addChangeListener(apolloListener,
Collections.singleton(key));
}
diff --git
a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/AbstractServiceNameMapping.java
b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/AbstractServiceNameMapping.java
index d26684a512..9ef88b5d98 100644
---
a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/AbstractServiceNameMapping.java
+++
b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/AbstractServiceNameMapping.java
@@ -21,12 +21,12 @@ import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.rpc.model.ApplicationModel;
import java.util.HashSet;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
@@ -50,7 +50,7 @@ public abstract class AbstractServiceNameMapping implements
ServiceNameMapping {
protected final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(getClass());
protected ApplicationModel applicationModel;
private final MappingCacheManager mappingCacheManager;
- private final Map<String, Set<MappingListener>> mappingListeners = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, Set<MappingListener>>
mappingListeners = new ConcurrentHashMap<>();
// mapping lock is shared among registries of the same application.
private final ConcurrentMap<String, ReentrantLock> mappingLocks = new
ConcurrentHashMap<>();
@@ -198,7 +198,7 @@ public abstract class AbstractServiceNameMapping implements
ServiceNameMapping {
}
public Lock getMappingLock(String key) {
- return mappingLocks.computeIfAbsent(key, _k -> new ReentrantLock());
+ return ConcurrentHashMapUtils.computeIfAbsent(mappingLocks, key, _k ->
new ReentrantLock());
}
protected void removeMappingLock(String key) {
@@ -239,8 +239,8 @@ public abstract class AbstractServiceNameMapping implements
ServiceNameMapping {
String mappingKey =
ServiceNameMapping.buildMappingKey(subscribedURL);
if (listener != null) {
mappedServices = toTreeSet(getAndListen(subscribedURL,
listener));
- Set<MappingListener> listeners =
- mappingListeners.computeIfAbsent(mappingKey,
_k -> new HashSet<>());
+ Set<MappingListener> listeners =
ConcurrentHashMapUtils.computeIfAbsent(
+ mappingListeners, mappingKey, _k -> new
HashSet<>());
listeners.add(listener);
if (CollectionUtils.isNotEmpty(mappedServices)) {
if (notifyAtFirstTime) {
diff --git
a/dubbo-metadata/dubbo-metadata-report-nacos/src/main/java/org/apache/dubbo/metadata/store/nacos/NacosMetadataReport.java
b/dubbo-metadata/dubbo-metadata-report-nacos/src/main/java/org/apache/dubbo/metadata/store/nacos/NacosMetadataReport.java
index d6dac4355e..621de18ece 100644
---
a/dubbo-metadata/dubbo-metadata-report-nacos/src/main/java/org/apache/dubbo/metadata/store/nacos/NacosMetadataReport.java
+++
b/dubbo-metadata/dubbo-metadata-report-nacos/src/main/java/org/apache/dubbo/metadata/store/nacos/NacosMetadataReport.java
@@ -22,6 +22,7 @@ import
org.apache.dubbo.common.config.configcenter.ConfigChangedEvent;
import org.apache.dubbo.common.config.configcenter.ConfigItem;
import org.apache.dubbo.common.config.configcenter.ConfigurationListener;
import org.apache.dubbo.common.constants.LoggerCodeConstants;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.JsonUtils;
import org.apache.dubbo.common.utils.MD5Utils;
import org.apache.dubbo.common.utils.StringUtils;
@@ -79,9 +80,9 @@ public class NacosMetadataReport extends
AbstractMetadataReport {
*/
private String group;
- private Map<String, NacosConfigListener> watchListenerMap = new
ConcurrentHashMap<>();
+ private ConcurrentHashMap<String, NacosConfigListener> watchListenerMap =
new ConcurrentHashMap<>();
- private Map<String, MappingDataListener> casListenerMap = new
ConcurrentHashMap<>();
+ private ConcurrentHashMap<String, MappingDataListener> casListenerMap =
new ConcurrentHashMap<>();
private MD5Utils md5Utils = new MD5Utils();
@@ -336,8 +337,8 @@ public class NacosMetadataReport extends
AbstractMetadataReport {
}
private void addCasServiceMappingListener(String serviceKey, String group,
MappingListener listener) {
- MappingDataListener mappingDataListener =
casListenerMap.computeIfAbsent(
- buildListenerKey(serviceKey, group), k -> new
MappingDataListener(serviceKey, group));
+ MappingDataListener mappingDataListener =
ConcurrentHashMapUtils.computeIfAbsent(
+ casListenerMap, buildListenerKey(serviceKey, group), k -> new
MappingDataListener(serviceKey, group));
mappingDataListener.addListeners(listener);
addListener(serviceKey, DEFAULT_MAPPING_GROUP, mappingDataListener);
}
@@ -355,8 +356,8 @@ public class NacosMetadataReport extends
AbstractMetadataReport {
public void addListener(String key, String group, ConfigurationListener
listener) {
String listenerKey = buildListenerKey(key, group);
- NacosConfigListener nacosConfigListener =
- watchListenerMap.computeIfAbsent(listenerKey, k ->
createTargetListener(key, group));
+ NacosConfigListener nacosConfigListener =
ConcurrentHashMapUtils.computeIfAbsent(
+ watchListenerMap, listenerKey, k -> createTargetListener(key,
group));
nacosConfigListener.addListener(listener);
try {
configService.addListener(key, group, nacosConfigListener);
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/MethodStatComposite.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/MethodStatComposite.java
index b9717a5b84..8661d48a61 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/MethodStatComposite.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/MethodStatComposite.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.metrics.data;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.metrics.exception.MetricsNeverHappenException;
import org.apache.dubbo.metrics.model.MethodMetric;
import org.apache.dubbo.metrics.model.MetricsCategory;
@@ -48,7 +49,8 @@ public class MethodStatComposite extends
AbstractMetricsExport {
super(applicationModel);
}
- private final Map<MetricsKeyWrapper, Map<MethodMetric, AtomicLong>>
methodNumStats = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<MetricsKeyWrapper,
ConcurrentHashMap<MethodMetric, AtomicLong>> methodNumStats =
+ new ConcurrentHashMap<>();
public void initWrapper(List<MetricsKeyWrapper> metricsKeyWrappers) {
if (CollectionUtils.isEmpty(metricsKeyWrappers)) {
@@ -65,11 +67,10 @@ public class MethodStatComposite extends
AbstractMetricsExport {
return;
}
- methodNumStats
- .get(wrapper)
- .computeIfAbsent(
- new MethodMetric(getApplicationModel(), invocation,
getServiceLevel()),
- k -> new AtomicLong(0L));
+ ConcurrentHashMapUtils.computeIfAbsent(
+ methodNumStats.get(wrapper),
+ new MethodMetric(getApplicationModel(), invocation,
getServiceLevel()),
+ k -> new AtomicLong(0L));
samplesChanged.set(true);
}
@@ -79,7 +80,8 @@ public class MethodStatComposite extends
AbstractMetricsExport {
}
AtomicLong stat = methodNumStats.get(wrapper).get(methodMetric);
if (stat == null) {
- methodNumStats.get(wrapper).computeIfAbsent(methodMetric, (k) ->
new AtomicLong(0L));
+ ConcurrentHashMapUtils.computeIfAbsent(
+ methodNumStats.get(wrapper), methodMetric, (k) -> new
AtomicLong(0L));
samplesChanged.set(true);
stat = methodNumStats.get(wrapper).get(methodMetric);
}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/RtStatComposite.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/RtStatComposite.java
index 729afdacde..3f551db8c2 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/RtStatComposite.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/RtStatComposite.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.metrics.data;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.metrics.model.MethodMetric;
import org.apache.dubbo.metrics.model.Metric;
import org.apache.dubbo.metrics.model.MetricsCategory;
@@ -57,7 +58,7 @@ public class RtStatComposite extends AbstractMetricsExport {
super(applicationModel);
}
- private final Map<String, List<LongContainer<? extends Number>>> rtStats =
new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, List<LongContainer<? extends
Number>>> rtStats = new ConcurrentHashMap<>();
public void init(MetricsPlaceValue... placeValues) {
if (placeValues == null) {
@@ -66,7 +67,8 @@ public class RtStatComposite extends AbstractMetricsExport {
for (MetricsPlaceValue placeValue : placeValues) {
List<LongContainer<? extends Number>> containers =
initStats(placeValue);
for (LongContainer<? extends Number> container : containers) {
-
rtStats.computeIfAbsent(container.getMetricsKeyWrapper().getType(), k -> new
ArrayList<>())
+ ConcurrentHashMapUtils.computeIfAbsent(
+ rtStats,
container.getMetricsKeyWrapper().getType(), k -> new ArrayList<>())
.add(container);
}
}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/ServiceStatComposite.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/ServiceStatComposite.java
index 0a63ad87ee..b55c90abdb 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/ServiceStatComposite.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/ServiceStatComposite.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.metrics.data;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.metrics.model.MetricsCategory;
import org.apache.dubbo.metrics.model.ServiceKeyMetric;
import org.apache.dubbo.metrics.model.key.MetricsKeyWrapper;
@@ -45,8 +46,8 @@ public class ServiceStatComposite extends
AbstractMetricsExport {
super(applicationModel);
}
- private final Map<MetricsKeyWrapper, Map<ServiceKeyMetric, AtomicLong>>
serviceWrapperNumStats =
- new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<MetricsKeyWrapper,
ConcurrentHashMap<ServiceKeyMetric, AtomicLong>>
+ serviceWrapperNumStats = new ConcurrentHashMap<>();
public void initWrapper(List<MetricsKeyWrapper> metricsKeyWrappers) {
if (CollectionUtils.isEmpty(metricsKeyWrappers)) {
@@ -71,10 +72,10 @@ public class ServiceStatComposite extends
AbstractMetricsExport {
if (extra != null) {
serviceKeyMetric.setExtraInfo(extra);
}
- Map<ServiceKeyMetric, AtomicLong> map =
serviceWrapperNumStats.get(wrapper);
+ ConcurrentHashMap<ServiceKeyMetric, AtomicLong> map =
serviceWrapperNumStats.get(wrapper);
AtomicLong metrics = map.get(serviceKeyMetric);
if (metrics == null) {
- metrics = map.computeIfAbsent(serviceKeyMetric, k -> new
AtomicLong(0L));
+ metrics = ConcurrentHashMapUtils.computeIfAbsent(map,
serviceKeyMetric, k -> new AtomicLong(0L));
samplesChanged.set(true);
}
metrics.getAndAdd(size);
@@ -93,10 +94,10 @@ public class ServiceStatComposite extends
AbstractMetricsExport {
if (extra != null) {
serviceKeyMetric.setExtraInfo(extra);
}
- Map<ServiceKeyMetric, AtomicLong> stats =
serviceWrapperNumStats.get(wrapper);
+ ConcurrentHashMap<ServiceKeyMetric, AtomicLong> stats =
serviceWrapperNumStats.get(wrapper);
AtomicLong metrics = stats.get(serviceKeyMetric);
if (metrics == null) {
- metrics = stats.computeIfAbsent(serviceKeyMetric, k -> new
AtomicLong(0L));
+ metrics = ConcurrentHashMapUtils.computeIfAbsent(stats,
serviceKeyMetric, k -> new AtomicLong(0L));
samplesChanged.set(true);
}
metrics.set(num);
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/listener/AbstractMetricsListener.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/listener/AbstractMetricsListener.java
index dce4a8b207..865b72ce97 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/listener/AbstractMetricsListener.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/listener/AbstractMetricsListener.java
@@ -16,22 +16,24 @@
*/
package org.apache.dubbo.metrics.listener;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.ReflectionUtils;
import org.apache.dubbo.metrics.event.MetricsEvent;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public abstract class AbstractMetricsListener<E extends MetricsEvent>
implements MetricsListener<E> {
- private final Map<Class<?>, Boolean> eventMatchCache = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Class<?>, Boolean> eventMatchCache = new
ConcurrentHashMap<>();
/**
* Whether to support the general determination of event points depends on
the event type
*/
public boolean isSupport(MetricsEvent event) {
- Boolean eventMatch = eventMatchCache.computeIfAbsent(
- event.getClass(), clazz -> ReflectionUtils.match(getClass(),
AbstractMetricsListener.class, event));
+ Boolean eventMatch = ConcurrentHashMapUtils.computeIfAbsent(
+ eventMatchCache,
+ event.getClass(),
+ clazz -> ReflectionUtils.match(getClass(),
AbstractMetricsListener.class, event));
return event.isAvailable() && eventMatch;
}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/MetricsSupport.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/MetricsSupport.java
index dc7b9e5aa2..0e5ad7d44d 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/MetricsSupport.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/MetricsSupport.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.metrics.model;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.lang.Nullable;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.metrics.collector.MethodMetricsCollector;
import org.apache.dubbo.metrics.collector.ServiceMetricsCollector;
@@ -36,6 +37,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -304,7 +306,7 @@ public class MetricsSupport {
/**
* Generate a complete indicator item for an interface/method
*/
- public static <T> void fillZero(Map<?, Map<T, AtomicLong>> data) {
+ public static <T> void fillZero(ConcurrentHashMap<?, ConcurrentHashMap<T,
AtomicLong>> data) {
if (CollectionUtils.isEmptyMap(data)) {
return;
}
@@ -312,7 +314,7 @@ public class MetricsSupport {
data.values().stream().flatMap(map ->
map.keySet().stream()).collect(Collectors.toSet());
data.forEach((keyWrapper, mapVal) -> {
for (T key : allKeyMetrics) {
- mapVal.computeIfAbsent(key, k -> new AtomicLong(0));
+ ConcurrentHashMapUtils.computeIfAbsent(mapVal, key, k -> new
AtomicLong(0));
}
});
}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/MetricsSupportTest.java
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/MetricsSupportTest.java
index 5ea95fdff7..76a439b51b 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/MetricsSupportTest.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/MetricsSupportTest.java
@@ -26,8 +26,7 @@ import org.apache.dubbo.metrics.model.key.MetricsPlaceValue;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.jupiter.api.Assertions;
@@ -44,16 +43,17 @@ public class MetricsSupportTest {
config.setName("MockMetrics");
applicationModel.getApplicationConfigManager().setApplication(config);
- Map<MetricsKeyWrapper, Map<ServiceKeyMetric, AtomicLong>> data = new
HashMap<>();
+ ConcurrentHashMap<MetricsKeyWrapper,
ConcurrentHashMap<ServiceKeyMetric, AtomicLong>> data =
+ new ConcurrentHashMap<>();
MetricsKeyWrapper key1 = new MetricsKeyWrapper(
METRIC_REQUESTS,
MetricsPlaceValue.of(CommonConstants.PROVIDER, MetricsLevel.METHOD));
MetricsKeyWrapper key2 = new MetricsKeyWrapper(
METRIC_REQUESTS,
MetricsPlaceValue.of(CommonConstants.CONSUMER, MetricsLevel.METHOD));
ServiceKeyMetric sm1 = new ServiceKeyMetric(applicationModel, "a.b.c");
ServiceKeyMetric sm2 = new ServiceKeyMetric(applicationModel, "a.b.d");
- data.computeIfAbsent(key1, k -> new HashMap<>()).put(sm1, new
AtomicLong(1));
- data.computeIfAbsent(key1, k -> new HashMap<>()).put(sm2, new
AtomicLong(1));
- data.put(key2, new HashMap<>());
+ data.computeIfAbsent(key1, k -> new ConcurrentHashMap<>()).put(sm1,
new AtomicLong(1));
+ data.computeIfAbsent(key1, k -> new ConcurrentHashMap<>()).put(sm2,
new AtomicLong(1));
+ data.put(key2, new ConcurrentHashMap<>());
Assertions.assertEquals(
2, data.values().stream().mapToLong(map ->
map.values().size()).sum());
MetricsSupport.fillZero(data);
diff --git
a/dubbo-metrics/dubbo-metrics-config-center/src/main/java/org/apache/dubbo/metrics/config/collector/ConfigCenterMetricsCollector.java
b/dubbo-metrics/dubbo-metrics-config-center/src/main/java/org/apache/dubbo/metrics/config/collector/ConfigCenterMetricsCollector.java
index 66ce47b6ef..6c6536e2d9 100644
---
a/dubbo-metrics/dubbo-metrics-config-center/src/main/java/org/apache/dubbo/metrics/config/collector/ConfigCenterMetricsCollector.java
+++
b/dubbo-metrics/dubbo-metrics-config-center/src/main/java/org/apache/dubbo/metrics/config/collector/ConfigCenterMetricsCollector.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.metrics.config.collector;
import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.config.context.ConfigManager;
import org.apache.dubbo.metrics.collector.CombMetricsCollector;
import org.apache.dubbo.metrics.collector.MetricsCollector;
@@ -30,7 +31,6 @@ import org.apache.dubbo.rpc.model.ApplicationModel;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -48,7 +48,7 @@ public class ConfigCenterMetricsCollector extends
CombMetricsCollector<ConfigCen
private final ApplicationModel applicationModel;
private final AtomicBoolean samplesChanged = new AtomicBoolean(true);
- private final Map<ConfigCenterMetric, AtomicLong> updatedMetrics = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<ConfigCenterMetric, AtomicLong>
updatedMetrics = new ConcurrentHashMap<>();
public ConfigCenterMetricsCollector(ApplicationModel applicationModel) {
super(null);
@@ -79,7 +79,7 @@ public class ConfigCenterMetricsCollector extends
CombMetricsCollector<ConfigCen
new ConfigCenterMetric(applicationModel.getApplicationName(),
key, group, protocol, changeTypeName);
AtomicLong metrics = updatedMetrics.get(metric);
if (metrics == null) {
- metrics = updatedMetrics.computeIfAbsent(metric, k -> new
AtomicLong(0L));
+ metrics = ConcurrentHashMapUtils.computeIfAbsent(updatedMetrics,
metric, k -> new AtomicLong(0L));
samplesChanged.set(true);
}
metrics.addAndGet(size);
diff --git
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
index eccef2a24e..cf9ccb1652 100644
---
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
+++
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
@@ -38,7 +38,6 @@ import org.apache.dubbo.rpc.model.ApplicationModel;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -61,8 +60,8 @@ public class AggregateMetricsCollector implements
MetricsCollector<RequestEvent>
private int bucketNum = DEFAULT_BUCKET_NUM;
private int timeWindowSeconds = DEFAULT_TIME_WINDOW_SECONDS;
private int qpsTimeWindowMillSeconds =
DEFAULT_QPS_TIME_WINDOW_MILL_SECONDS;
- private final Map<MetricsKeyWrapper, ConcurrentHashMap<MethodMetric,
TimeWindowCounter>> methodTypeCounter =
- new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<MetricsKeyWrapper,
ConcurrentHashMap<MethodMetric, TimeWindowCounter>>
+ methodTypeCounter = new ConcurrentHashMap<>();
private final ConcurrentMap<MethodMetric, TimeWindowQuantile> rt = new
ConcurrentHashMap<>();
private final ConcurrentHashMap<MethodMetric, TimeWindowCounter> qps = new
ConcurrentHashMap<>();
private final ApplicationModel applicationModel;
@@ -204,8 +203,8 @@ public class AggregateMetricsCollector implements
MetricsCollector<RequestEvent>
MethodMetric metric =
new MethodMetric(applicationModel,
event.getAttachmentValue(MetricsConstants.INVOCATION), serviceLevel);
- ConcurrentMap<MethodMetric, TimeWindowCounter> counter =
- methodTypeCounter.computeIfAbsent(metricsKeyWrapper, k -> new
ConcurrentHashMap<>());
+ ConcurrentMap<MethodMetric, TimeWindowCounter> counter =
ConcurrentHashMapUtils.computeIfAbsent(
+ methodTypeCounter, metricsKeyWrapper, k -> new
ConcurrentHashMap<>());
TimeWindowCounter windowCounter = counter.get(metric);
if (windowCounter == null) {
@@ -391,8 +390,8 @@ public class AggregateMetricsCollector implements
MetricsCollector<RequestEvent>
MethodMetric metric =
new MethodMetric(applicationModel,
event.getAttachmentValue(MetricsConstants.INVOCATION), serviceLevel);
- ConcurrentMap<MethodMetric, TimeWindowCounter> counter =
- methodTypeCounter.computeIfAbsent(metricsKeyWrapper, k -> new
ConcurrentHashMap<>());
+ ConcurrentMap<MethodMetric, TimeWindowCounter> counter =
ConcurrentHashMapUtils.computeIfAbsent(
+ methodTypeCounter, metricsKeyWrapper, k -> new
ConcurrentHashMap<>());
ConcurrentHashMapUtils.computeIfAbsent(
counter, metric, methodMetric -> new
TimeWindowCounter(bucketNum, timeWindowSeconds));
diff --git
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/MetricsCountSampler.java
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/MetricsCountSampler.java
index 4b02dcd365..49cbb87747 100644
---
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/MetricsCountSampler.java
+++
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/MetricsCountSampler.java
@@ -19,12 +19,12 @@ package org.apache.dubbo.metrics.collector.sample;
import org.apache.dubbo.metrics.model.Metric;
import java.util.Optional;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public interface MetricsCountSampler<S, K, M extends Metric> extends
MetricsSampler {
void inc(S source, K metricName);
- Optional<ConcurrentMap<M, AtomicLong>> getCount(K metricName);
+ Optional<ConcurrentHashMap<M, AtomicLong>> getCount(K metricName);
}
diff --git
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/SimpleMetricsCountSampler.java
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/SimpleMetricsCountSampler.java
index 9944b0aca6..487cad0ba3 100644
---
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/SimpleMetricsCountSampler.java
+++
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/SimpleMetricsCountSampler.java
@@ -17,12 +17,11 @@
package org.apache.dubbo.metrics.collector.sample;
import org.apache.dubbo.common.utils.Assert;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.metrics.model.Metric;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -32,9 +31,9 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public abstract class SimpleMetricsCountSampler<S, K, M extends Metric>
implements MetricsCountSampler<S, K, M> {
- private final ConcurrentMap<M, AtomicLong> EMPTY_COUNT = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<M, AtomicLong> EMPTY_COUNT = new
ConcurrentHashMap<>();
- private final Map<K, ConcurrentMap<M, AtomicLong>> metricCounter = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<K, ConcurrentHashMap<M, AtomicLong>>
metricCounter = new ConcurrentHashMap<>();
@Override
public void inc(S source, K metricName) {
@@ -42,7 +41,7 @@ public abstract class SimpleMetricsCountSampler<S, K, M
extends Metric> implemen
}
@Override
- public Optional<ConcurrentMap<M, AtomicLong>> getCount(K metricName) {
+ public Optional<ConcurrentHashMap<M, AtomicLong>> getCount(K metricName) {
return Optional.ofNullable(metricCounter.get(metricName) == null ?
EMPTY_COUNT : metricCounter.get(metricName));
}
@@ -55,10 +54,11 @@ public abstract class SimpleMetricsCountSampler<S, K, M
extends Metric> implemen
this.countConfigure(sampleConfigure);
- Map<M, AtomicLong> metricAtomic = metricCounter.get(metricsName);
+ ConcurrentHashMap<M, AtomicLong> metricAtomic =
metricCounter.get(metricsName);
if (metricAtomic == null) {
- metricAtomic = metricCounter.computeIfAbsent(metricsName, k -> new
ConcurrentHashMap<>());
+ metricAtomic =
+ ConcurrentHashMapUtils.computeIfAbsent(metricCounter,
metricsName, k -> new ConcurrentHashMap<>());
}
Assert.notNull(sampleConfigure.getMetric(), "metrics is null");
@@ -66,7 +66,8 @@ public abstract class SimpleMetricsCountSampler<S, K, M
extends Metric> implemen
AtomicLong atomicCounter =
metricAtomic.get(sampleConfigure.getMetric());
if (atomicCounter == null) {
- atomicCounter =
metricAtomic.computeIfAbsent(sampleConfigure.getMetric(), k -> new
AtomicLong());
+ atomicCounter = ConcurrentHashMapUtils.computeIfAbsent(
+ metricAtomic, sampleConfigure.getMetric(), k -> new
AtomicLong());
}
return atomicCounter;
}
diff --git
a/dubbo-metrics/dubbo-metrics-registry/src/main/java/org/apache/dubbo/metrics/registry/collector/RegistryStatComposite.java
b/dubbo-metrics/dubbo-metrics-registry/src/main/java/org/apache/dubbo/metrics/registry/collector/RegistryStatComposite.java
index 567b6d6ee0..9064bea8ae 100644
---
a/dubbo-metrics/dubbo-metrics-registry/src/main/java/org/apache/dubbo/metrics/registry/collector/RegistryStatComposite.java
+++
b/dubbo-metrics/dubbo-metrics-registry/src/main/java/org/apache/dubbo/metrics/registry/collector/RegistryStatComposite.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.metrics.registry.collector;
import org.apache.dubbo.common.constants.RegistryConstants;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.metrics.model.ApplicationMetric;
import org.apache.dubbo.metrics.model.MetricsCategory;
import org.apache.dubbo.metrics.model.MetricsSupport;
@@ -40,7 +41,8 @@ import static
org.apache.dubbo.metrics.MetricsConstants.SELF_INCREMENT_SIZE;
public class RegistryStatComposite extends AbstractMetricsExport {
- private final Map<MetricsKey, Map<ApplicationMetric, AtomicLong>> appStats
= new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<MetricsKey,
ConcurrentHashMap<ApplicationMetric, AtomicLong>> appStats =
+ new ConcurrentHashMap<>();
private final AtomicBoolean samplesChanged = new AtomicBoolean(true);
@@ -81,17 +83,17 @@ public class RegistryStatComposite extends
AbstractMetricsExport {
ApplicationMetric applicationMetric = new
ApplicationMetric(getApplicationModel());
applicationMetric.setExtraInfo(
Collections.singletonMap(RegistryConstants.REGISTRY_CLUSTER_KEY.toLowerCase(),
name));
- Map<ApplicationMetric, AtomicLong> stats = appStats.get(metricsKey);
+ ConcurrentHashMap<ApplicationMetric, AtomicLong> stats =
appStats.get(metricsKey);
AtomicLong metrics = stats.get(applicationMetric);
if (metrics == null) {
- metrics = stats.computeIfAbsent(applicationMetric, k -> new
AtomicLong(0L));
+ metrics = ConcurrentHashMapUtils.computeIfAbsent(stats,
applicationMetric, k -> new AtomicLong(0L));
samplesChanged.set(true);
}
metrics.getAndAdd(SELF_INCREMENT_SIZE);
MetricsSupport.fillZero(appStats);
}
- public Map<MetricsKey, Map<ApplicationMetric, AtomicLong>> getAppStats() {
+ public ConcurrentHashMap<MetricsKey, ConcurrentHashMap<ApplicationMetric,
AtomicLong>> getAppStats() {
return appStats;
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
index 191636e19a..6723c6daa9 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.registry.client;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.JsonUtils;
import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -73,7 +74,7 @@ public class DefaultServiceInstance implements
ServiceInstance {
private transient List<Endpoint> endpoints;
private transient ApplicationModel applicationModel;
- private transient Map<String, InstanceAddressURL> instanceAddressURL = new
ConcurrentHashMap<>();
+ private transient ConcurrentHashMap<String, InstanceAddressURL>
instanceAddressURL = new ConcurrentHashMap<>();
public DefaultServiceInstance() {}
@@ -291,8 +292,8 @@ public class DefaultServiceInstance implements
ServiceInstance {
@Override
public InstanceAddressURL toURL(String protocol) {
- return instanceAddressURL.computeIfAbsent(
- protocol, key -> new InstanceAddressURL(this, serviceMetadata,
protocol));
+ return ConcurrentHashMapUtils.computeIfAbsent(
+ instanceAddressURL, protocol, key -> new
InstanceAddressURL(this, serviceMetadata, protocol));
}
@Override
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
index f84255abeb..074c7fe61e 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
@@ -82,7 +82,7 @@ public class ServiceDiscoveryRegistry extends
FailbackRegistry {
/* apps - listener */
private final Map<String, ServiceInstancesChangedListener>
serviceListeners = new ConcurrentHashMap<>();
- private final Map<String, Set<MappingListener>> mappingListeners = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, Set<MappingListener>>
mappingListeners = new ConcurrentHashMap<>();
/* This lock has the same scope and lifecycle as its corresponding
instance listener.
It's used to make sure that only one interface mapping to the same app
list can do subscribe or unsubscribe at the same moment.
And the lock should be destroyed when listener destroying its
corresponding instance listener.
@@ -218,8 +218,8 @@ public class ServiceDiscoveryRegistry extends
FailbackRegistry {
// it's protected by the mapping lock, so it won't
override the event value.
mappingListener.updateInitialApps(mappingByUrl);
synchronized (mappingListeners) {
- mappingListeners
- .computeIfAbsent(url.getProtocolServiceKey(),
(k) -> new ConcurrentHashSet<>())
+ ConcurrentHashMapUtils.computeIfAbsent(
+ mappingListeners,
url.getProtocolServiceKey(), (k) -> new ConcurrentHashSet<>())
.add(mappingListener);
}
} catch (Exception e) {
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 78362506dd..6d3a254c63 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -24,6 +24,7 @@ import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.metadata.MetadataInfo.ServiceInfo;
@@ -77,7 +78,7 @@ public class ServiceInstancesChangedListener {
protected final Set<String> serviceNames;
protected final ServiceDiscovery serviceDiscovery;
- protected Map<String, Set<NotifyListenerWithKey>> listeners;
+ protected ConcurrentHashMap<String, Set<NotifyListenerWithKey>> listeners;
protected AtomicBoolean destroyed = new AtomicBoolean(false);
@@ -254,8 +255,8 @@ public class ServiceInstancesChangedListener {
return;
}
- Set<NotifyListenerWithKey> notifyListeners =
- this.listeners.computeIfAbsent(url.getServiceKey(), _k -> new
ConcurrentHashSet<>());
+ Set<NotifyListenerWithKey> notifyListeners =
ConcurrentHashMapUtils.computeIfAbsent(
+ this.listeners, url.getServiceKey(), _k -> new
ConcurrentHashSet<>());
String protocol = listener.getConsumerUrl().getParameter(PROTOCOL_KEY,
url.getProtocol());
ProtocolServiceKey protocolServiceKey = new ProtocolServiceKey(
url.getServiceInterface(),
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/ExporterFactory.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/ExporterFactory.java
index f9d89ebf1a..508d5538b9 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/ExporterFactory.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/ExporterFactory.java
@@ -16,17 +16,17 @@
*/
package org.apache.dubbo.registry.integration;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.rpc.Exporter;
-import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
public class ExporterFactory {
- private final Map<String, ReferenceCountExporter<?>> exporters = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, ReferenceCountExporter<?>>
exporters = new ConcurrentHashMap<>();
protected ReferenceCountExporter<?> createExporter(String providerKey,
Callable<Exporter<?>> exporterProducer) {
- return exporters.computeIfAbsent(providerKey, key -> {
+ return ConcurrentHashMapUtils.computeIfAbsent(exporters, providerKey,
key -> {
try {
return new ReferenceCountExporter<>(exporterProducer.call(),
key, this);
} catch (Exception e) {
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
index 17ae1800cd..84b99e2870 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
@@ -27,6 +27,7 @@ import
org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.timer.HashedWheelTimer;
import org.apache.dubbo.common.url.component.ServiceConfigURL;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.StringUtils;
@@ -181,7 +182,8 @@ public class RegistryProtocol implements Protocol,
ScopeModelAware {
// To solve the problem of RMI repeated exposure port conflicts, the
services that have been exposed are no longer
// exposed.
// provider url <--> registry url <--> exporter
- private final Map<String, Map<String, ExporterChangeableWrapper<?>>>
bounds = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, ConcurrentHashMap<String,
ExporterChangeableWrapper<?>>> bounds =
+ new ConcurrentHashMap<>();
protected Protocol protocol;
protected ProxyFactory proxyFactory;
@@ -278,10 +280,9 @@ public class RegistryProtocol implements Protocol,
ScopeModelAware {
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new
OverrideListener(overrideSubscribeUrl, originInvoker);
- Map<URL, Set<NotifyListener>> overrideListeners =
+ ConcurrentHashMap<URL, Set<NotifyListener>> overrideListeners =
getProviderConfigurationListener(overrideSubscribeUrl).getOverrideListeners();
- overrideListeners
- .computeIfAbsent(overrideSubscribeUrl, k -> new
ConcurrentHashSet<>())
+ ConcurrentHashMapUtils.computeIfAbsent(overrideListeners,
overrideSubscribeUrl, k -> new ConcurrentHashSet<>())
.add(overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl,
overrideSubscribeListener);
@@ -352,10 +353,10 @@ public class RegistryProtocol implements Protocol,
ScopeModelAware {
ReferenceCountExporter<?> exporter =
exporterFactory.createExporter(providerUrlKey, () ->
protocol.export(invokerDelegate));
- return (ExporterChangeableWrapper<T>)
bounds.computeIfAbsent(providerUrlKey, k -> new ConcurrentHashMap<>())
- .computeIfAbsent(
- registryUrlKey,
- s -> new
ExporterChangeableWrapper<>((ReferenceCountExporter<T>) exporter,
originInvoker));
+ return (ExporterChangeableWrapper<T>)
ConcurrentHashMapUtils.computeIfAbsent(
+ ConcurrentHashMapUtils.computeIfAbsent(bounds, providerUrlKey,
k -> new ConcurrentHashMap<>()),
+ registryUrlKey,
+ s -> new
ExporterChangeableWrapper<>((ReferenceCountExporter<T>) exporter,
originInvoker));
}
public <T> void reExport(Exporter<T> exporter, URL newInvokerUrl) {
@@ -972,7 +973,7 @@ public class RegistryProtocol implements Protocol,
ScopeModelAware {
private class ProviderConfigurationListener extends
AbstractConfiguratorListener {
- private final Map<URL, Set<NotifyListener>> overrideListeners = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<URL, Set<NotifyListener>>
overrideListeners = new ConcurrentHashMap<>();
private final ModuleModel moduleModel;
@@ -1011,7 +1012,7 @@ public class RegistryProtocol implements Protocol,
ScopeModelAware {
}
}
- public Map<URL, Set<NotifyListener>> getOverrideListeners() {
+ public ConcurrentHashMap<URL, Set<NotifyListener>>
getOverrideListeners() {
return overrideListeners;
}
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
index 27983bf958..98623bf45f 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.StringUtils;
@@ -452,7 +453,8 @@ public abstract class AbstractRegistry implements Registry {
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}
- Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n ->
new ConcurrentHashSet<>());
+ Set<NotifyListener> listeners =
+ ConcurrentHashMapUtils.computeIfAbsent(subscribed, url, n ->
new ConcurrentHashSet<>());
listeners.add(listener);
}
@@ -567,7 +569,8 @@ public abstract class AbstractRegistry implements Registry {
if (result.size() == 0) {
return;
}
- Map<String, List<URL>> categoryNotified =
notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
+ Map<String, List<URL>> categoryNotified =
+ ConcurrentHashMapUtils.computeIfAbsent(notified, url, u -> new
ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
diff --git
a/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java
b/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java
index 0ca952e6d8..1575cc6853 100644
---
a/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java
+++
b/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;
@@ -345,7 +346,7 @@ public class MulticastRegistry extends FailbackRegistry {
for (Map.Entry<URL, Set<NotifyListener>> entry :
getSubscribed().entrySet()) {
URL key = entry.getKey();
if (UrlUtils.isMatch(key, url)) {
- Set<URL> urls = received.computeIfAbsent(key, k -> new
ConcurrentHashSet<>());
+ Set<URL> urls =
ConcurrentHashMapUtils.computeIfAbsent(received, key, k -> new
ConcurrentHashSet<>());
urls.add(url);
List<URL> list = toList(urls);
for (final NotifyListener listener : entry.getValue()) {
diff --git
a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
index 573affda2b..fa8d64c756 100644
---
a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
+++
b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
@@ -23,6 +23,7 @@ import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.url.component.DubboServiceAddressURL;
import org.apache.dubbo.common.url.component.ServiceConfigURL;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.SystemPropertyConfigUtils;
import org.apache.dubbo.common.utils.UrlUtils;
@@ -137,8 +138,9 @@ public class NacosRegistry extends FailbackRegistry {
private final Map<URL, Map<NotifyListener, NacosAggregateListener>>
originToAggregateListener =
new ConcurrentHashMap<>();
- private final Map<URL, Map<NacosAggregateListener, Map<String,
EventListener>>> nacosListeners =
- new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<
+ URL, ConcurrentHashMap<NacosAggregateListener,
ConcurrentHashMap<String, EventListener>>>
+ nacosListeners = new ConcurrentHashMap<>();
private final boolean supportLegacyServiceName;
public NacosRegistry(URL url, NacosNamingServiceWrapper namingService) {
@@ -642,20 +644,22 @@ public class NacosRegistry extends FailbackRegistry {
private void subscribeEventListener(String serviceName, final URL url,
final NacosAggregateListener listener)
throws NacosException {
- Map<NacosAggregateListener, Map<String, EventListener>> listeners =
- nacosListeners.computeIfAbsent(url, k -> new
ConcurrentHashMap<>());
+ ConcurrentHashMap<NacosAggregateListener, ConcurrentHashMap<String,
EventListener>> listeners =
+ ConcurrentHashMapUtils.computeIfAbsent(nacosListeners, url, k
-> new ConcurrentHashMap<>());
- Map<String, EventListener> eventListeners =
listeners.computeIfAbsent(listener, k -> new ConcurrentHashMap<>());
+ ConcurrentHashMap<String, EventListener> eventListeners =
+ ConcurrentHashMapUtils.computeIfAbsent(listeners, listener, k
-> new ConcurrentHashMap<>());
- EventListener eventListener = eventListeners.computeIfAbsent(
- serviceName, k -> new RegistryChildListenerImpl(serviceName,
url, listener));
+ EventListener eventListener = ConcurrentHashMapUtils.computeIfAbsent(
+ eventListeners, serviceName, k -> new
RegistryChildListenerImpl(serviceName, url, listener));
namingService.subscribe(serviceName,
getUrl().getGroup(Constants.DEFAULT_GROUP), eventListener);
}
private void unsubscribeEventListener(String serviceName, final URL url,
final NacosAggregateListener listener)
throws NacosException {
- Map<NacosAggregateListener, Map<String, EventListener>>
listenerToServiceEvent = nacosListeners.get(url);
+ ConcurrentHashMap<NacosAggregateListener, ConcurrentHashMap<String,
EventListener>> listenerToServiceEvent =
+ nacosListeners.get(url);
if (listenerToServiceEvent == null) {
return;
}
diff --git
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
index 06c2305055..61dbce3a92 100644
---
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
+++
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.common.function.ThrowableFunction;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
@@ -33,7 +34,6 @@ import org.apache.dubbo.rpc.model.ApplicationModel;
import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -75,7 +75,8 @@ public class ZookeeperServiceDiscovery extends
AbstractServiceDiscovery {
/**
* The Key is watched Zookeeper path, the value is an instance of {@link
CuratorWatcher}
*/
- private final Map<String, ZookeeperServiceDiscoveryChangeWatcher>
watcherCaches = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String,
ZookeeperServiceDiscoveryChangeWatcher> watcherCaches =
+ new ConcurrentHashMap<>();
public ZookeeperServiceDiscovery(ApplicationModel applicationModel, URL
registryURL) {
super(applicationModel, registryURL);
@@ -210,21 +211,22 @@ public class ZookeeperServiceDiscovery extends
AbstractServiceDiscovery {
protected void registerServiceWatcher(String serviceName,
ServiceInstancesChangedListener listener) {
CountDownLatch latch = new CountDownLatch(1);
- ZookeeperServiceDiscoveryChangeWatcher watcher =
watcherCaches.computeIfAbsent(serviceName, name -> {
- ServiceCache<ZookeeperInstance> serviceCache =
- serviceDiscovery.serviceCacheBuilder().name(name).build();
- ZookeeperServiceDiscoveryChangeWatcher newer =
- new ZookeeperServiceDiscoveryChangeWatcher(this,
serviceCache, name, latch);
- serviceCache.addListener(newer);
-
- try {
- serviceCache.start();
- } catch (Exception e) {
- throw new RpcException(REGISTRY_EXCEPTION, "Failed subscribe
service: " + name, e);
- }
+ ZookeeperServiceDiscoveryChangeWatcher watcher =
+ ConcurrentHashMapUtils.computeIfAbsent(watcherCaches,
serviceName, name -> {
+ ServiceCache<ZookeeperInstance> serviceCache =
+
serviceDiscovery.serviceCacheBuilder().name(name).build();
+ ZookeeperServiceDiscoveryChangeWatcher newer =
+ new ZookeeperServiceDiscoveryChangeWatcher(this,
serviceCache, name, latch);
+ serviceCache.addListener(newer);
- return newer;
- });
+ try {
+ serviceCache.start();
+ } catch (Exception e) {
+ throw new RpcException(REGISTRY_EXCEPTION, "Failed
subscribe service: " + name, e);
+ }
+
+ return newer;
+ });
watcher.addListener(listener);
listener.onEvent(new ServiceInstancesChangedEvent(serviceName,
this.getInstances(serviceName)));
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/CodecUtils.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/CodecUtils.java
index 143857e5fa..389795328b 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/CodecUtils.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/CodecUtils.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.Configuration;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.utils.Assert;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.StringUtils;
import
org.apache.dubbo.remoting.http12.exception.UnsupportedMediaTypeException;
import org.apache.dubbo.remoting.http12.message.HttpMessageDecoder;
@@ -32,7 +33,6 @@ import org.apache.dubbo.rpc.model.FrameworkModel;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -42,8 +42,10 @@ public final class CodecUtils {
private final FrameworkModel frameworkModel;
private final List<HttpMessageDecoderFactory> decoderFactories;
private final List<HttpMessageEncoderFactory> encoderFactories;
- private final Map<String, Optional<HttpMessageEncoderFactory>>
encoderCache = new ConcurrentHashMap<>();
- private final Map<String, Optional<HttpMessageDecoderFactory>>
decoderCache = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String,
Optional<HttpMessageEncoderFactory>> encoderCache =
+ new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String,
Optional<HttpMessageDecoderFactory>> decoderCache =
+ new ConcurrentHashMap<>();
private Set<String> disallowedContentTypes = Collections.emptySet();
public CodecUtils(FrameworkModel frameworkModel) {
@@ -80,7 +82,7 @@ public final class CodecUtils {
public Optional<HttpMessageDecoderFactory>
determineHttpMessageDecoderFactory(String mediaType) {
Assert.notNull(mediaType, "mediaType must not be null");
- return decoderCache.computeIfAbsent(mediaType, k -> {
+ return ConcurrentHashMapUtils.computeIfAbsent(decoderCache, mediaType,
k -> {
for (HttpMessageDecoderFactory factory : decoderFactories) {
if (factory.supports(k)
&&
!disallowedContentTypes.contains(factory.mediaType().getName())) {
@@ -93,7 +95,7 @@ public final class CodecUtils {
public Optional<HttpMessageEncoderFactory>
determineHttpMessageEncoderFactory(String mediaType) {
Assert.notNull(mediaType, "mediaType must not be null");
- return encoderCache.computeIfAbsent(mediaType, k -> {
+ return ConcurrentHashMapUtils.computeIfAbsent(encoderCache, mediaType,
k -> {
for (HttpMessageEncoderFactory factory : encoderFactories) {
if (factory.supports(k)
&&
!disallowedContentTypes.contains(factory.mediaType().getName())) {
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/TraceFilter.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/TraceFilter.java
index b856e8d57b..5436eeb495 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/TraceFilter.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/TraceFilter.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.JsonUtils;
import org.apache.dubbo.common.utils.StringUtils;
@@ -60,7 +61,7 @@ public class TraceFilter implements Filter {
channel.setAttribute(TRACE_MAX, max);
channel.setAttribute(TRACE_COUNT, new AtomicInteger());
String key = StringUtils.isNotEmpty(method) ? type.getName() + "." +
method : type.getName();
- Set<Channel> channels = TRACERS.computeIfAbsent(key, k -> new
ConcurrentHashSet<>());
+ Set<Channel> channels =
ConcurrentHashMapUtils.computeIfAbsent(TRACERS, key, k -> new
ConcurrentHashSet<>());
channels.add(channel);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http3Exchanger.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http3Exchanger.java
index 3394a81295..3bea1f4335 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http3Exchanger.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http3Exchanger.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.config.Configuration;
import org.apache.dubbo.common.constants.LoggerCodeConstants;
import org.apache.dubbo.common.logger.FluentLogger;
import org.apache.dubbo.common.utils.ClassUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.RemotingServer;
@@ -56,7 +57,7 @@ public final class Http3Exchanger {
private static final FluentLogger LOGGER =
FluentLogger.of(Http3Exchanger.class);
private static final boolean HAS_NETTY_HTTP3 =
ClassUtils.isPresent("io.netty.incubator.codec.http3.Http3");
- private static final Map<String, RemotingServer> SERVERS = new
ConcurrentHashMap<>();
+ private static final ConcurrentHashMap<String, RemotingServer> SERVERS =
new ConcurrentHashMap<>();
private static final Map<String, AbstractConnectionClient> CLIENTS = new
ConcurrentHashMap<>(16);
private static final ChannelHandler HANDLER = new ChannelHandlerAdapter();
@@ -80,7 +81,7 @@ public final class Http3Exchanger {
public static RemotingServer bind(URL url) {
if (isEnabled(url)) {
- return SERVERS.computeIfAbsent(url.getAddress(), addr -> {
+ return ConcurrentHashMapUtils.computeIfAbsent(SERVERS,
url.getAddress(), addr -> {
try {
URL serverUrl =
url.putAttribute(PIPELINE_CONFIGURATOR_KEY, configServerPipeline(url));
return new NettyHttp3Server(serverUrl, HANDLER);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index 5490af698a..1a6bac6073 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -24,6 +24,7 @@ import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.SystemPropertyConfigUtils;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
import org.apache.dubbo.rpc.AppResponse;
@@ -60,7 +61,6 @@ import
org.apache.dubbo.rpc.service.ServiceDescriptorInternalCache;
import org.apache.dubbo.rpc.support.RpcUtils;
import java.util.Arrays;
-import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -100,7 +100,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
private static final boolean setFutureWhenSync =
Boolean.parseBoolean(SystemPropertyConfigUtils.getSystemProperty(
CommonConstants.ThirdPartyProperty.SET_FUTURE_IN_SYNC_MODE,
"true"));
private final PackableMethodFactory packableMethodFactory;
- private final Map<MethodDescriptor, PackableMethod> packableMethodCache =
new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<MethodDescriptor, PackableMethod>
packableMethodCache = new ConcurrentHashMap<>();
private static Compressor compressor;
public TripleInvoker(
@@ -313,8 +313,10 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
if (methodDescriptor instanceof PackableMethod) {
meta.packableMethod = (PackableMethod) methodDescriptor;
} else {
- meta.packableMethod = packableMethodCache.computeIfAbsent(
- methodDescriptor, (md) -> packableMethodFactory.create(md,
url, APPLICATION_GRPC_PROTO.getName()));
+ meta.packableMethod = ConcurrentHashMapUtils.computeIfAbsent(
+ packableMethodCache,
+ methodDescriptor,
+ (md) -> packableMethodFactory.create(md, url,
APPLICATION_GRPC_PROTO.getName()));
}
meta.convertNoLowerHeader = TripleProtocol.CONVERT_NO_LOWER_HEADER;
meta.ignoreDefaultVersion = TripleProtocol.IGNORE_1_0_0_VERSION;
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
index 18eb6894d5..e66eebad79 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.common.utils.ArrayUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;
@@ -64,9 +65,11 @@ public class GrpcCompositeCodec implements HttpMessageCodec {
return;
}
- packableMethod = UrlUtils.computeServiceAttribute(
- url, PACKABLE_METHOD_CACHE, k -> new
ConcurrentHashMap<MethodDescriptor, PackableMethod>())
- .computeIfAbsent(methodDescriptor, md -> frameworkModel
+ packableMethod = ConcurrentHashMapUtils.computeIfAbsent(
+ UrlUtils.computeServiceAttribute(
+ url, PACKABLE_METHOD_CACHE, k -> new
ConcurrentHashMap<MethodDescriptor, PackableMethod>()),
+ methodDescriptor,
+ md -> frameworkModel
.getExtensionLoader(PackableMethodFactory.class)
.getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel())
.getString(DUBBO_PACKABLE_METHOD_FACTORY,
DEFAULT_KEY))
diff --git
a/dubbo-serialization/dubbo-serialization-fastjson2/src/main/java/org/apache/dubbo/common/serialize/fastjson2/Fastjson2CreatorManager.java
b/dubbo-serialization/dubbo-serialization-fastjson2/src/main/java/org/apache/dubbo/common/serialize/fastjson2/Fastjson2CreatorManager.java
index 4120dc3943..9b27bff6c2 100644
---
a/dubbo-serialization/dubbo-serialization-fastjson2/src/main/java/org/apache/dubbo/common/serialize/fastjson2/Fastjson2CreatorManager.java
+++
b/dubbo-serialization/dubbo-serialization-fastjson2/src/main/java/org/apache/dubbo/common/serialize/fastjson2/Fastjson2CreatorManager.java
@@ -17,10 +17,10 @@
package org.apache.dubbo.common.serialize.fastjson2;
import org.apache.dubbo.common.aot.NativeDetector;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.ScopeClassLoaderListener;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.alibaba.fastjson2.JSONFactory;
@@ -36,8 +36,8 @@ public class Fastjson2CreatorManager implements
ScopeClassLoaderListener<Framewo
*/
private static final ClassLoader SYSTEM_CLASSLOADER_KEY = new
ClassLoader() {};
- private final Map<ClassLoader, ObjectReaderCreator> readerMap = new
ConcurrentHashMap<>();
- private final Map<ClassLoader, ObjectWriterCreator> writerMap = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<ClassLoader, ObjectReaderCreator>
readerMap = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<ClassLoader, ObjectWriterCreator>
writerMap = new ConcurrentHashMap<>();
public Fastjson2CreatorManager(FrameworkModel frameworkModel) {
frameworkModel.addClassLoaderListener(this);
@@ -51,8 +51,10 @@ public class Fastjson2CreatorManager implements
ScopeClassLoaderListener<Framewo
JSONFactory.setContextReaderCreator(readerMap.putIfAbsent(classLoader,
ObjectReaderCreator.INSTANCE));
JSONFactory.setContextWriterCreator(writerMap.putIfAbsent(classLoader,
ObjectWriterCreator.INSTANCE));
} else {
-
JSONFactory.setContextReaderCreator(readerMap.computeIfAbsent(classLoader,
ObjectReaderCreatorASM::new));
-
JSONFactory.setContextWriterCreator(writerMap.computeIfAbsent(classLoader,
ObjectWriterCreatorASM::new));
+ JSONFactory.setContextReaderCreator(
+ ConcurrentHashMapUtils.computeIfAbsent(readerMap,
classLoader, ObjectReaderCreatorASM::new));
+ JSONFactory.setContextWriterCreator(
+ ConcurrentHashMapUtils.computeIfAbsent(writerMap,
classLoader, ObjectWriterCreatorASM::new));
}
}