This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d0a69be563 [ISSUE #6810] Fix the bug of mistakenly deleting data in 
clientChannelTable when the channel expire (#7073)
d0a69be563 is described below

commit d0a69be563785ca815dc31ef1aab4c1bc5588c01
Author: zd46319 <[email protected]>
AuthorDate: Thu Jul 27 16:56:41 2023 +0800

    [ISSUE #6810] Fix the bug of mistakenly deleting data in clientChannelTable 
when the channel expire (#7073)
---
 .../rocketmq/broker/client/ProducerManager.java    |  5 +++-
 .../broker/client/ProducerManagerTest.java         | 34 ++++++++++++++++++++++
 2 files changed, 38 insertions(+), 1 deletion(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 52d67bf282..f9fe1193e2 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -112,7 +112,10 @@ public class ProducerManager {
                 long diff = System.currentTimeMillis() - 
info.getLastUpdateTimestamp();
                 if (diff > CHANNEL_EXPIRED_TIMEOUT) {
                     it.remove();
-                    clientChannelTable.remove(info.getClientId());
+                    Channel channelInClientTable = 
clientChannelTable.get(info.getClientId());
+                    if (channelInClientTable != null && 
channelInClientTable.equals(info.getChannel())) {
+                        clientChannelTable.remove(info.getClientId());
+                    }
                     log.warn(
                             "ProducerManager#scanNotActiveChannel: remove 
expired channel[{}] from ProducerManager groupChannelTable, producer group 
name: {}",
                             
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
index dac5468c87..3d6091e02f 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
@@ -27,6 +27,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -79,6 +80,39 @@ public class ProducerManagerTest {
         assertThat(producerManager.findChannel("clientId")).isNull();
     }
 
+    @Test
+    public void scanNotActiveChannelWithSameClientId() throws Exception {
+        producerManager.registerProducer(group, clientInfo);
+        Channel channel1 = Mockito.mock(Channel.class);
+        ClientChannelInfo clientInfo1 = new ClientChannelInfo(channel1, 
clientInfo.getClientId(), LanguageCode.JAVA, 0);
+        producerManager.registerProducer(group, clientInfo1);
+        AtomicReference<String> groupRef = new AtomicReference<>();
+        AtomicReference<ClientChannelInfo> clientChannelInfoRef = new 
AtomicReference<>();
+        producerManager.appendProducerChangeListener((event, group, 
clientChannelInfo) -> {
+            switch (event) {
+                case GROUP_UNREGISTER:
+                    groupRef.set(group);
+                    break;
+                case CLIENT_UNREGISTER:
+                    clientChannelInfoRef.set(clientChannelInfo);
+                    break;
+                default:
+                    break;
+            }
+        });
+        
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
+        
assertThat(producerManager.getGroupChannelTable().get(group).get(channel1)).isNotNull();
+        assertThat(producerManager.findChannel("clientId")).isNotNull();
+        Field field = 
ProducerManager.class.getDeclaredField("CHANNEL_EXPIRED_TIMEOUT");
+        field.setAccessible(true);
+        long channelExpiredTimeout = field.getLong(producerManager);
+        clientInfo.setLastUpdateTimestamp(System.currentTimeMillis() - 
channelExpiredTimeout - 10);
+        when(channel.close()).thenReturn(mock(ChannelFuture.class));
+        producerManager.scanNotActiveChannel();
+        
assertThat(producerManager.getGroupChannelTable().get(group).get(channel1)).isNotNull();
+        assertThat(producerManager.findChannel("clientId")).isNotNull();
+    }
+
     @Test
     public void doChannelCloseEvent() throws Exception {
         producerManager.registerProducer(group, clientInfo);

Reply via email to