This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 4b311ff4dd36cb2ad6ef160efae9f07d6985b286 Author: zhouxiang <[email protected]> AuthorDate: Thu Dec 8 19:44:43 2022 +0800 [ISSUE #5485] Use local address, remoting port and grpc port to build unique local proxy Id --- .../proxy/service/sysmessage/HeartbeatSyncer.java | 16 +++++++++++----- .../proxy/service/sysmessage/HeartbeatSyncerData.java | 16 ++++++++-------- .../proxy/service/sysmessage/HeartbeatSyncerTest.java | 8 ++++---- 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java index 9e333902e..51af02170 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java @@ -50,11 +50,13 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer { protected ThreadPoolExecutor threadPoolExecutor; protected ConsumerManager consumerManager; protected final Map<String /* channelId as longText */, RemoteChannel> remoteChannelMap = new ConcurrentHashMap<>(); + protected String localProxyId; public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService adminService, ConsumerManager consumerManager, MQClientAPIFactory mqClientAPIFactory) { super(topicRouteService, adminService, mqClientAPIFactory); this.consumerManager = consumerManager; + this.localProxyId = buildLocalProxyId(); this.init(); } @@ -104,7 +106,6 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer { try { this.threadPoolExecutor.submit(() -> { try { - ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); RemoteChannel remoteChannel = RemoteChannel.create(clientChannelInfo.getChannel()); if (remoteChannel == null) { return; @@ -118,7 +119,7 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer { consumeType, messageModel, consumeFromWhere, - proxyConfig.getLocalServeAddr(), + localProxyId, remoteChannel.encode() ); data.setSubscriptionDataSet(subList); @@ -143,7 +144,6 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer { try { this.threadPoolExecutor.submit(() -> { try { - ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); RemoteChannel remoteChannel = RemoteChannel.create(clientChannelInfo.getChannel()); if (remoteChannel == null) { return; @@ -157,7 +157,7 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer { null, null, null, - proxyConfig.getLocalServeAddr(), + localProxyId, remoteChannel.encode() ); @@ -183,7 +183,7 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer { for (MessageExt msg : msgs) { try { HeartbeatSyncerData data = JSON.parseObject(new String(msg.getBody(), StandardCharsets.UTF_8), HeartbeatSyncerData.class); - if (data.getConnectProxyIp().equals(ConfigurationManager.getProxyConfig().getLocalServeAddr())) { + if (data.getLocalProxyId().equals(localProxyId)) { continue; } @@ -221,4 +221,10 @@ public class HeartbeatSyncer extends AbstractSystemMessageSyncer { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } + + private String buildLocalProxyId() { + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + // use local address, remoting port and grpc port to build unique local proxy Id + return proxyConfig.getLocalServeAddr() + "%" + proxyConfig.getRemotingListenPort() + "%" + proxyConfig.getGrpcServerPort(); + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java index 10c6f1206..97760506f 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java @@ -36,7 +36,7 @@ public class HeartbeatSyncerData { private ConsumeType consumeType; private MessageModel messageModel; private ConsumeFromWhere consumeFromWhere; - private String connectProxyIp; + private String localProxyId; private String channelData; public HeartbeatSyncerData() { @@ -45,7 +45,7 @@ public class HeartbeatSyncerData { public HeartbeatSyncerData(HeartbeatType heartbeatType, String clientId, LanguageCode language, int version, String group, ConsumeType consumeType, MessageModel messageModel, - ConsumeFromWhere consumeFromWhere, String connectProxyIp, + ConsumeFromWhere consumeFromWhere, String localProxyId, String channelData) { this.heartbeatType = heartbeatType; this.clientId = clientId; @@ -55,7 +55,7 @@ public class HeartbeatSyncerData { this.consumeType = consumeType; this.messageModel = messageModel; this.consumeFromWhere = consumeFromWhere; - this.connectProxyIp = connectProxyIp; + this.localProxyId = localProxyId; this.channelData = channelData; } @@ -140,12 +140,12 @@ public class HeartbeatSyncerData { this.consumeFromWhere = consumeFromWhere; } - public String getConnectProxyIp() { - return connectProxyIp; + public String getLocalProxyId() { + return localProxyId; } - public void setConnectProxyIp(String connectProxyIp) { - this.connectProxyIp = connectProxyIp; + public void setLocalProxyId(String localProxyId) { + this.localProxyId = localProxyId; } public String getChannelData() { @@ -169,7 +169,7 @@ public class HeartbeatSyncerData { .add("consumeType", consumeType) .add("messageModel", messageModel) .add("consumeFromWhere", consumeFromWhere) - .add("connectProxyIp", connectProxyIp) + .add("connectProxyIp", localProxyId) .add("channelData", channelData) .toString(); } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java index 95152186d..df98f31dc 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java @@ -185,7 +185,7 @@ public class HeartbeatSyncerTest extends InitConfigTest { String localServeAddr = ConfigurationManager.getProxyConfig().getLocalServeAddr(); // change local serve addr, to simulate other proxy receive messages - ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10)); + heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10); ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class); doReturn(true).when(consumerManager).registerConsumer(anyString(), syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean()); @@ -207,7 +207,7 @@ public class HeartbeatSyncerTest extends InitConfigTest { doNothing().when(consumerManager).unregisterConsumer(anyString(), syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean()); // change local serve addr, to simulate other proxy receive messages - ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10)); + heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10); heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getAllValues().get(1))), null); assertSame(channelInfoList.get(0).getChannel(), syncUnRegisterChannelInfoArgumentCaptor.getValue().getChannel()); } @@ -248,7 +248,7 @@ public class HeartbeatSyncerTest extends InitConfigTest { String localServeAddr = ConfigurationManager.getProxyConfig().getLocalServeAddr(); // change local serve addr, to simulate other proxy receive messages - ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10)); + heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10); ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class); doReturn(true).when(consumerManager).registerConsumer(anyString(), syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean()); @@ -270,7 +270,7 @@ public class HeartbeatSyncerTest extends InitConfigTest { doNothing().when(consumerManager).unregisterConsumer(anyString(), syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean()); // change local serve addr, to simulate other proxy receive messages - ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10)); + heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10); heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getAllValues().get(1))), null); assertSame(channelInfoList.get(0).getChannel(), syncUnRegisterChannelInfoArgumentCaptor.getValue().getChannel()); }
