This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3808387e13 [ISSUE #7429] clean channel map when CLIENT_UNREGISTER in
proxy
3808387e13 is described below
commit 3808387e1389278edbe4ef023d200ecb3015622b
Author: lk <[email protected]>
AuthorDate: Mon Oct 9 16:07:56 2023 +0800
[ISSUE #7429] clean channel map when CLIENT_UNREGISTER in proxy
---
.../proxy/service/sysmessage/HeartbeatSyncer.java | 31 ++++++----
.../service/sysmessage/HeartbeatSyncerTest.java | 68 ++++++++++++++++++++++
2 files changed, 88 insertions(+), 11 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
index f70c06b8f4..fee3ea87d2 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.proxy.service.sysmessage;
import com.alibaba.fastjson.JSON;
+import io.netty.channel.Channel;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
@@ -73,16 +74,8 @@ public class HeartbeatSyncer extends
AbstractSystemMessageSyncer {
);
this.consumerManager.appendConsumerIdsChangeListener(new
ConsumerIdsChangeListener() {
@Override
- public void handle(ConsumerGroupEvent event, String s, Object...
args) {
- if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) {
- if (args == null || args.length < 1) {
- return;
- }
- if (args[0] instanceof ClientChannelInfo) {
- ClientChannelInfo clientChannelInfo =
(ClientChannelInfo) args[0];
-
remoteChannelMap.remove(clientChannelInfo.getChannel().id().asLongText());
- }
- }
+ public void handle(ConsumerGroupEvent event, String group,
Object... args) {
+ processConsumerGroupEvent(event, group, args);
}
@Override
@@ -98,6 +91,18 @@ public class HeartbeatSyncer extends
AbstractSystemMessageSyncer {
super.shutdown();
}
+ protected void processConsumerGroupEvent(ConsumerGroupEvent event, String
group, Object... args) {
+ if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) {
+ if (args == null || args.length < 1) {
+ return;
+ }
+ if (args[0] instanceof ClientChannelInfo) {
+ ClientChannelInfo clientChannelInfo = (ClientChannelInfo)
args[0];
+ remoteChannelMap.remove(buildKey(group,
clientChannelInfo.getChannel()));
+ }
+ }
+ }
+
public void onConsumerRegister(String consumerGroup, ClientChannelInfo
clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere
consumeFromWhere,
Set<SubscriptionData> subList) {
@@ -189,7 +194,7 @@ public class HeartbeatSyncer extends
AbstractSystemMessageSyncer {
}
RemoteChannel decodedChannel =
RemoteChannel.decode(data.getChannelData());
- RemoteChannel channel =
remoteChannelMap.computeIfAbsent(data.getGroup() + "@" +
decodedChannel.id().asLongText(), key -> decodedChannel);
+ RemoteChannel channel =
remoteChannelMap.computeIfAbsent(buildKey(data.getGroup(), decodedChannel), key
-> decodedChannel);
channel.setExtendAttribute(decodedChannel.getChannelExtendAttribute());
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
channel,
@@ -228,4 +233,8 @@ public class HeartbeatSyncer extends
AbstractSystemMessageSyncer {
// use local address, remoting port and grpc port to build unique
local proxy Id
return proxyConfig.getLocalServeAddr() + "%" +
proxyConfig.getRemotingListenPort() + "%" + proxyConfig.getGrpcServerPort();
}
+
+ private static String buildKey(String group, Channel channel) {
+ return group + "@" + channel.id().asLongText();
+ }
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
index 43fba3d03c..9a2c5e3437 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import java.time.Duration;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -35,6 +36,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIExt;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
@@ -320,6 +322,72 @@ public class HeartbeatSyncerTest extends InitConfigTest {
}
}
+ @Test
+ public void testProcessConsumerGroupEventForRemoting() {
+ String consumerGroup = "consumerGroup";
+ Channel channel = createMockChannel();
+ RemotingProxyOutClient remotingProxyOutClient =
mock(RemotingProxyOutClient.class);
+ RemotingChannel remotingChannel = new
RemotingChannel(remotingProxyOutClient, proxyRelayService, channel, clientId,
Collections.emptySet());
+ ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+ remotingChannel,
+ clientId,
+ LanguageCode.JAVA,
+ 4
+ );
+
+ testProcessConsumerGroupEvent(consumerGroup, clientChannelInfo);
+ }
+
+ @Test
+ public void testProcessConsumerGroupEventForGrpcV2() {
+ String consumerGroup = "consumerGroup";
+ GrpcClientSettingsManager grpcClientSettingsManager =
mock(GrpcClientSettingsManager.class);
+ GrpcChannelManager grpcChannelManager = mock(GrpcChannelManager.class);
+ GrpcClientChannel grpcClientChannel = new GrpcClientChannel(
+ proxyRelayService, grpcClientSettingsManager, grpcChannelManager,
+
ProxyContext.create().setRemoteAddress(remoteAddress).setLocalAddress(localAddress),
+ clientId);
+ ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+ grpcClientChannel,
+ clientId,
+ LanguageCode.JAVA,
+ 5
+ );
+
+ testProcessConsumerGroupEvent(consumerGroup, clientChannelInfo);
+ }
+
+ private void testProcessConsumerGroupEvent(String consumerGroup,
ClientChannelInfo clientChannelInfo) {
+ HeartbeatSyncer heartbeatSyncer = new
HeartbeatSyncer(topicRouteService, adminService, consumerManager,
mqClientAPIFactory, null);
+ SendResult okSendResult = new SendResult();
+ okSendResult.setSendStatus(SendStatus.SEND_OK);
+
+ ArgumentCaptor<Message> messageArgumentCaptor =
ArgumentCaptor.forClass(Message.class);
+
doReturn(CompletableFuture.completedFuture(okSendResult)).when(this.mqClientAPIExt)
+ .sendMessageAsync(anyString(), anyString(),
messageArgumentCaptor.capture(), any(), anyLong());
+
+ heartbeatSyncer.onConsumerRegister(
+ consumerGroup,
+ clientChannelInfo,
+ ConsumeType.CONSUME_PASSIVELY,
+ MessageModel.CLUSTERING,
+ ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
+ Collections.emptySet()
+ );
+ await().atMost(Duration.ofSeconds(3)).until(() ->
messageArgumentCaptor.getAllValues().size() == 1);
+
+ // change local serve addr, to simulate other proxy receive messages
+ heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
+ ArgumentCaptor<ClientChannelInfo> channelInfoArgumentCaptor =
ArgumentCaptor.forClass(ClientChannelInfo.class);
+ doReturn(true).when(consumerManager).registerConsumer(anyString(),
channelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean());
+
+
heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()),
null);
+ assertEquals(1, heartbeatSyncer.remoteChannelMap.size());
+
+
heartbeatSyncer.processConsumerGroupEvent(ConsumerGroupEvent.CLIENT_UNREGISTER,
consumerGroup, channelInfoArgumentCaptor.getValue());
+ assertTrue(heartbeatSyncer.remoteChannelMap.isEmpty());
+ }
+
private MessageExt convertFromMessage(Message message) {
MessageExt messageExt = new MessageExt();
messageExt.setTopic(message.getTopic());