This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 07f13fd883 fix: remove unnecessary synchronized to improve concurrency
(#8840)
07f13fd883 is described below
commit 07f13fd883e87e40d7de4827e28913e617fb9832
Author: Zhanhui Li <[email protected]>
AuthorDate: Mon Oct 21 09:25:55 2024 +0800
fix: remove unnecessary synchronized to improve concurrency (#8840)
Signed-off-by: Li Zhanhui <[email protected]>
---
.../rocketmq/broker/client/ProducerManager.java | 45 +++++++++++-----------
1 file changed, 23 insertions(+), 22 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index f9fe1193e2..011c9e4be3 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -39,11 +40,11 @@ public class ProducerManager {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private static final int GET_AVAILABLE_CHANNEL_RETRY_COUNT = 3;
- private final ConcurrentHashMap<String /* group name */,
ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable =
+ private final ConcurrentMap<String /* group name */,
ConcurrentMap<Channel, ClientChannelInfo>> groupChannelTable =
new ConcurrentHashMap<>();
- private final ConcurrentHashMap<String, Channel> clientChannelTable = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Channel> clientChannelTable = new
ConcurrentHashMap<>();
protected final BrokerStatsManager brokerStatsManager;
- private PositiveAtomicCounter positiveAtomicCounter = new
PositiveAtomicCounter();
+ private final PositiveAtomicCounter positiveAtomicCounter = new
PositiveAtomicCounter();
private final List<ProducerChangeListener> producerChangeListenerList =
new CopyOnWriteArrayList<>();
public ProducerManager() {
@@ -63,7 +64,7 @@ public class ProducerManager {
return channels != null && !channels.isEmpty();
}
- public ConcurrentHashMap<String, ConcurrentHashMap<Channel,
ClientChannelInfo>> getGroupChannelTable() {
+ public ConcurrentMap<String, ConcurrentMap<Channel, ClientChannelInfo>>
getGroupChannelTable() {
return groupChannelTable;
}
@@ -95,13 +96,13 @@ public class ProducerManager {
}
public void scanNotActiveChannel() {
- Iterator<Map.Entry<String, ConcurrentHashMap<Channel,
ClientChannelInfo>>> iterator = this.groupChannelTable.entrySet().iterator();
+ Iterator<Map.Entry<String, ConcurrentMap<Channel, ClientChannelInfo>>>
iterator = this.groupChannelTable.entrySet().iterator();
while (iterator.hasNext()) {
- Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>>
entry = iterator.next();
+ Map.Entry<String, ConcurrentMap<Channel, ClientChannelInfo>> entry
= iterator.next();
final String group = entry.getKey();
- final ConcurrentHashMap<Channel, ClientChannelInfo> chlMap =
entry.getValue();
+ final ConcurrentMap<Channel, ClientChannelInfo> chlMap =
entry.getValue();
Iterator<Entry<Channel, ClientChannelInfo>> it =
chlMap.entrySet().iterator();
while (it.hasNext()) {
@@ -132,16 +133,13 @@ public class ProducerManager {
}
}
- public synchronized boolean doChannelCloseEvent(final String remoteAddr,
final Channel channel) {
+ public boolean doChannelCloseEvent(final String remoteAddr, final Channel
channel) {
boolean removed = false;
if (channel != null) {
- for (final Map.Entry<String, ConcurrentHashMap<Channel,
ClientChannelInfo>> entry : this.groupChannelTable
- .entrySet()) {
+ for (final Map.Entry<String, ConcurrentMap<Channel,
ClientChannelInfo>> entry : this.groupChannelTable.entrySet()) {
final String group = entry.getKey();
- final ConcurrentHashMap<Channel, ClientChannelInfo>
clientChannelInfoTable =
- entry.getValue();
- final ClientChannelInfo clientChannelInfo =
- clientChannelInfoTable.remove(channel);
+ final ConcurrentMap<Channel, ClientChannelInfo>
clientChannelInfoTable = entry.getValue();
+ final ClientChannelInfo clientChannelInfo =
clientChannelInfoTable.remove(channel);
if (clientChannelInfo != null) {
clientChannelTable.remove(clientChannelInfo.getClientId());
removed = true;
@@ -150,7 +148,7 @@ public class ProducerManager {
clientChannelInfo.toString(), remoteAddr, group);
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group,
clientChannelInfo);
if (clientChannelInfoTable.isEmpty()) {
- ConcurrentHashMap<Channel, ClientChannelInfo>
oldGroupTable = this.groupChannelTable.remove(group);
+ ConcurrentMap<Channel, ClientChannelInfo>
oldGroupTable = this.groupChannelTable.remove(group);
if (oldGroupTable != null) {
log.info("unregister a producer group[{}] from
groupChannelTable", group);
callProducerChangeListener(ProducerGroupEvent.GROUP_UNREGISTER, group, null);
@@ -163,13 +161,16 @@ public class ProducerManager {
return removed;
}
- public synchronized void registerProducer(final String group, final
ClientChannelInfo clientChannelInfo) {
- ClientChannelInfo clientChannelInfoFound = null;
+ public void registerProducer(final String group, final ClientChannelInfo
clientChannelInfo) {
+ ClientChannelInfo clientChannelInfoFound;
- ConcurrentHashMap<Channel, ClientChannelInfo> channelTable =
this.groupChannelTable.get(group);
+ ConcurrentMap<Channel, ClientChannelInfo> channelTable =
this.groupChannelTable.get(group);
if (null == channelTable) {
channelTable = new ConcurrentHashMap<>();
- this.groupChannelTable.put(group, channelTable);
+ ConcurrentMap<Channel, ClientChannelInfo> prev =
this.groupChannelTable.putIfAbsent(group, channelTable);
+ if (null != prev) {
+ channelTable = prev;
+ }
}
clientChannelInfoFound =
channelTable.get(clientChannelInfo.getChannel());
@@ -186,8 +187,8 @@ public class ProducerManager {
}
}
- public synchronized void unregisterProducer(final String group, final
ClientChannelInfo clientChannelInfo) {
- ConcurrentHashMap<Channel, ClientChannelInfo> channelTable =
this.groupChannelTable.get(group);
+ public void unregisterProducer(final String group, final ClientChannelInfo
clientChannelInfo) {
+ ConcurrentMap<Channel, ClientChannelInfo> channelTable =
this.groupChannelTable.get(group);
if (null != channelTable && !channelTable.isEmpty()) {
ClientChannelInfo old =
channelTable.remove(clientChannelInfo.getChannel());
clientChannelTable.remove(clientChannelInfo.getClientId());
@@ -210,7 +211,7 @@ public class ProducerManager {
return null;
}
List<Channel> channelList;
- ConcurrentHashMap<Channel, ClientChannelInfo>
channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
+ ConcurrentMap<Channel, ClientChannelInfo>
channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
if (channelClientChannelInfoHashMap != null) {
channelList = new
ArrayList<>(channelClientChannelInfoHashMap.keySet());
} else {