areyouok commented on a change in pull request #1627: [for 4.6.0] Fix 
concurrent problem in ProducerManager.getAvaliableCha…
URL: https://github.com/apache/rocketmq/pull/1627#discussion_r361597823
 
 

 ##########
 File path: 
broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
 ##########
 @@ -36,205 +32,145 @@
 
 public class ProducerManager {
     private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private static final long LOCK_TIMEOUT_MILLIS = 3000;
     private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
     private static final int GET_AVALIABLE_CHANNEL_RETRY_COUNT = 3;
-    private final Lock groupChannelLock = new ReentrantLock();
-    private final HashMap<String /* group name */, HashMap<Channel, 
ClientChannelInfo>> groupChannelTable =
-        new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+    private final ConcurrentHashMap<String /* group name */, 
ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable =
+        new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String, Channel> clientChannelTable = new 
ConcurrentHashMap<>();
     private PositiveAtomicCounter positiveAtomicCounter = new 
PositiveAtomicCounter();
 
     public ProducerManager() {
     }
 
-    public HashMap<String, HashMap<Channel, ClientChannelInfo>> 
getGroupChannelTable() {
-        HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> 
newGroupChannelTable =
-            new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
-        try {
-            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS)) {
-                try {
-                    Iterator<Map.Entry<String, HashMap<Channel, 
ClientChannelInfo>>> iter = groupChannelTable.entrySet().iterator();
-                    while (iter.hasNext()) {
-                        Map.Entry<String, HashMap<Channel, ClientChannelInfo>> 
entry = iter.next();
-                        String key = entry.getKey();
-                        HashMap<Channel, ClientChannelInfo> val = 
entry.getValue();
-                        HashMap<Channel, ClientChannelInfo> tmp = new 
HashMap<Channel, ClientChannelInfo>();
-                        tmp.putAll(val);
-                        newGroupChannelTable.put(key, tmp);
-                    }
-                } finally {
-                    groupChannelLock.unlock();
-                }
-            }
-        } catch (InterruptedException e) {
-            log.error("", e);
-        }
-        return newGroupChannelTable;
+    public ConcurrentHashMap<String, ConcurrentHashMap<Channel, 
ClientChannelInfo>> getGroupChannelTable() {
+        return groupChannelTable;
     }
 
     public void scanNotActiveChannel() {
-        try {
-            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS)) {
-                try {
-                    for (final Map.Entry<String, HashMap<Channel, 
ClientChannelInfo>> entry : this.groupChannelTable
-                        .entrySet()) {
-                        final String group = entry.getKey();
-                        final HashMap<Channel, ClientChannelInfo> chlMap = 
entry.getValue();
-
-                        Iterator<Entry<Channel, ClientChannelInfo>> it = 
chlMap.entrySet().iterator();
-                        while (it.hasNext()) {
-                            Entry<Channel, ClientChannelInfo> item = it.next();
-                            // final Integer id = item.getKey();
-                            final ClientChannelInfo info = item.getValue();
-
-                            long diff = System.currentTimeMillis() - 
info.getLastUpdateTimestamp();
-                            if (diff > CHANNEL_EXPIRED_TIMEOUT) {
-                                it.remove();
-                                clientChannelTable.remove(info.getClientId());
-                                log.warn(
-                                    "SCAN: remove expired channel[{}] from 
ProducerManager groupChannelTable, producer group name: {}",
-                                    
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
-                                RemotingUtil.closeChannel(info.getChannel());
-                            }
-                        }
-                    }
-                } finally {
-                    this.groupChannelLock.unlock();
+        for (final Map.Entry<String, ConcurrentHashMap<Channel, 
ClientChannelInfo>> entry : this.groupChannelTable
+                .entrySet()) {
+            final String group = entry.getKey();
+            final ConcurrentHashMap<Channel, ClientChannelInfo> chlMap = 
entry.getValue();
+
+            Iterator<Entry<Channel, ClientChannelInfo>> it = 
chlMap.entrySet().iterator();
+            while (it.hasNext()) {
+                Entry<Channel, ClientChannelInfo> item = it.next();
+                // final Integer id = item.getKey();
+                final ClientChannelInfo info = item.getValue();
+
+                long diff = System.currentTimeMillis() - 
info.getLastUpdateTimestamp();
+                if (diff > CHANNEL_EXPIRED_TIMEOUT) {
+                    it.remove();
+                    clientChannelTable.remove(info.getClientId());
+                    log.warn(
+                            "SCAN: remove expired channel[{}] from 
ProducerManager groupChannelTable, producer group name: {}",
+                            
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
+                    RemotingUtil.closeChannel(info.getChannel());
                 }
-            } else {
-                log.warn("ProducerManager scanNotActiveChannel lock timeout");
             }
-        } catch (InterruptedException e) {
-            log.error("", e);
         }
     }
 
-    public void doChannelCloseEvent(final String remoteAddr, final Channel 
channel) {
+    public synchronized void doChannelCloseEvent(final String remoteAddr, 
final Channel channel) {
         if (channel != null) {
-            try {
-                if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS)) {
-                    try {
-                        for (final Map.Entry<String, HashMap<Channel, 
ClientChannelInfo>> entry : this.groupChannelTable
-                            .entrySet()) {
-                            final String group = entry.getKey();
-                            final HashMap<Channel, ClientChannelInfo> 
clientChannelInfoTable =
-                                entry.getValue();
-                            final ClientChannelInfo clientChannelInfo =
-                                clientChannelInfoTable.remove(channel);
-                            if (clientChannelInfo != null) {
-                                
clientChannelTable.remove(clientChannelInfo.getClientId());
-                                log.info(
-                                    "NETTY EVENT: remove channel[{}][{}] from 
ProducerManager groupChannelTable, producer group: {}",
-                                    clientChannelInfo.toString(), remoteAddr, 
group);
-                            }
-
-                        }
-                    } finally {
-                        this.groupChannelLock.unlock();
-                    }
-                } else {
-                    log.warn("ProducerManager doChannelCloseEvent lock 
timeout");
+            for (final Map.Entry<String, ConcurrentHashMap<Channel, 
ClientChannelInfo>> entry : this.groupChannelTable
+                    .entrySet()) {
+                final String group = entry.getKey();
+                final ConcurrentHashMap<Channel, ClientChannelInfo> 
clientChannelInfoTable =
+                        entry.getValue();
+                final ClientChannelInfo clientChannelInfo =
+                        clientChannelInfoTable.remove(channel);
+                if (clientChannelInfo != null) {
+                    clientChannelTable.remove(clientChannelInfo.getClientId());
+                    log.info(
+                            "NETTY EVENT: remove channel[{}][{}] from 
ProducerManager groupChannelTable, producer group: {}",
+                            clientChannelInfo.toString(), remoteAddr, group);
                 }
-            } catch (InterruptedException e) {
-                log.error("", e);
+
             }
         }
     }
 
-    public void registerProducer(final String group, final ClientChannelInfo 
clientChannelInfo) {
-        try {
-            ClientChannelInfo clientChannelInfoFound = null;
-
-            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS)) {
-                try {
-                    HashMap<Channel, ClientChannelInfo> channelTable = 
this.groupChannelTable.get(group);
-                    if (null == channelTable) {
-                        channelTable = new HashMap<>();
-                        this.groupChannelTable.put(group, channelTable);
-                    }
-
-                    clientChannelInfoFound = 
channelTable.get(clientChannelInfo.getChannel());
-                    if (null == clientChannelInfoFound) {
-                        channelTable.put(clientChannelInfo.getChannel(), 
clientChannelInfo);
-                        
clientChannelTable.put(clientChannelInfo.getClientId(), 
clientChannelInfo.getChannel());
-                        log.info("new producer connected, group: {} channel: 
{}", group,
-                            clientChannelInfo.toString());
-                    }
-                } finally {
-                    this.groupChannelLock.unlock();
-                }
+    public synchronized void registerProducer(final String group, final 
ClientChannelInfo clientChannelInfo) {
+        ClientChannelInfo clientChannelInfoFound = null;
 
-                if (clientChannelInfoFound != null) {
-                    
clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
-                }
-            } else {
-                log.warn("ProducerManager registerProducer lock timeout");
-            }
-        } catch (InterruptedException e) {
-            log.error("", e);
+        ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = 
this.groupChannelTable.get(group);
+        if (null == channelTable) {
+            channelTable = new ConcurrentHashMap<>();
+            this.groupChannelTable.put(group, channelTable);
+        }
+
+        clientChannelInfoFound = 
channelTable.get(clientChannelInfo.getChannel());
+        if (null == clientChannelInfoFound) {
+            channelTable.put(clientChannelInfo.getChannel(), 
clientChannelInfo);
+            clientChannelTable.put(clientChannelInfo.getClientId(), 
clientChannelInfo.getChannel());
+            log.info("new producer connected, group: {} channel: {}", group,
+                    clientChannelInfo.toString());
+        }
+
+
+        if (clientChannelInfoFound != null) {
+            
clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
         }
     }
 
-    public void unregisterProducer(final String group, final ClientChannelInfo 
clientChannelInfo) {
-        try {
-            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS)) {
-                try {
-                    HashMap<Channel, ClientChannelInfo> channelTable = 
this.groupChannelTable.get(group);
-                    if (null != channelTable && !channelTable.isEmpty()) {
-                        ClientChannelInfo old = 
channelTable.remove(clientChannelInfo.getChannel());
-                        
clientChannelTable.remove(clientChannelInfo.getClientId());
-                        if (old != null) {
-                            log.info("unregister a producer[{}] from 
groupChannelTable {}", group,
-                                clientChannelInfo.toString());
-                        }
-
-                        if (channelTable.isEmpty()) {
-                            this.groupChannelTable.remove(group);
-                            log.info("unregister a producer group[{}] from 
groupChannelTable", group);
-                        }
-                    }
-                } finally {
-                    this.groupChannelLock.unlock();
-                }
-            } else {
-                log.warn("ProducerManager unregisterProducer lock timeout");
+    public synchronized void unregisterProducer(final String group, final 
ClientChannelInfo clientChannelInfo) {
+        ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = 
this.groupChannelTable.get(group);
+        if (null != channelTable && !channelTable.isEmpty()) {
+            ClientChannelInfo old = 
channelTable.remove(clientChannelInfo.getChannel());
+            clientChannelTable.remove(clientChannelInfo.getClientId());
+            if (old != null) {
+                log.info("unregister a producer[{}] from groupChannelTable 
{}", group,
+                        clientChannelInfo.toString());
+            }
+
+            if (channelTable.isEmpty()) {
+                this.groupChannelTable.remove(group);
+                log.info("unregister a producer group[{}] from 
groupChannelTable", group);
             }
-        } catch (InterruptedException e) {
-            log.error("", e);
         }
     }
 
     public Channel getAvaliableChannel(String groupId) {
 
 Review comment:
   This method is public, change name may cause incompatibility

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to