This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 4b311ff4dd36cb2ad6ef160efae9f07d6985b286
Author: zhouxiang <[email protected]>
AuthorDate: Thu Dec 8 19:44:43 2022 +0800

    [ISSUE #5485] Use local address, remoting port and grpc port to build 
unique local proxy Id
---
 .../proxy/service/sysmessage/HeartbeatSyncer.java        | 16 +++++++++++-----
 .../proxy/service/sysmessage/HeartbeatSyncerData.java    | 16 ++++++++--------
 .../proxy/service/sysmessage/HeartbeatSyncerTest.java    |  8 ++++----
 3 files changed, 23 insertions(+), 17 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
index 9e333902e..51af02170 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
@@ -50,11 +50,13 @@ public class HeartbeatSyncer extends 
AbstractSystemMessageSyncer {
     protected ThreadPoolExecutor threadPoolExecutor;
     protected ConsumerManager consumerManager;
     protected final Map<String /* channelId as longText */, RemoteChannel> 
remoteChannelMap = new ConcurrentHashMap<>();
+    protected String localProxyId;
 
     public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService 
adminService,
         ConsumerManager consumerManager, MQClientAPIFactory 
mqClientAPIFactory) {
         super(topicRouteService, adminService, mqClientAPIFactory);
         this.consumerManager = consumerManager;
+        this.localProxyId = buildLocalProxyId();
         this.init();
     }
 
@@ -104,7 +106,6 @@ public class HeartbeatSyncer extends 
AbstractSystemMessageSyncer {
         try {
             this.threadPoolExecutor.submit(() -> {
                 try {
-                    ProxyConfig proxyConfig = 
ConfigurationManager.getProxyConfig();
                     RemoteChannel remoteChannel = 
RemoteChannel.create(clientChannelInfo.getChannel());
                     if (remoteChannel == null) {
                         return;
@@ -118,7 +119,7 @@ public class HeartbeatSyncer extends 
AbstractSystemMessageSyncer {
                         consumeType,
                         messageModel,
                         consumeFromWhere,
-                        proxyConfig.getLocalServeAddr(),
+                        localProxyId,
                         remoteChannel.encode()
                     );
                     data.setSubscriptionDataSet(subList);
@@ -143,7 +144,6 @@ public class HeartbeatSyncer extends 
AbstractSystemMessageSyncer {
         try {
             this.threadPoolExecutor.submit(() -> {
                 try {
-                    ProxyConfig proxyConfig = 
ConfigurationManager.getProxyConfig();
                     RemoteChannel remoteChannel = 
RemoteChannel.create(clientChannelInfo.getChannel());
                     if (remoteChannel == null) {
                         return;
@@ -157,7 +157,7 @@ public class HeartbeatSyncer extends 
AbstractSystemMessageSyncer {
                         null,
                         null,
                         null,
-                        proxyConfig.getLocalServeAddr(),
+                        localProxyId,
                         remoteChannel.encode()
                     );
 
@@ -183,7 +183,7 @@ public class HeartbeatSyncer extends 
AbstractSystemMessageSyncer {
         for (MessageExt msg : msgs) {
             try {
                 HeartbeatSyncerData data = JSON.parseObject(new 
String(msg.getBody(), StandardCharsets.UTF_8), HeartbeatSyncerData.class);
-                if 
(data.getConnectProxyIp().equals(ConfigurationManager.getProxyConfig().getLocalServeAddr()))
 {
+                if (data.getLocalProxyId().equals(localProxyId)) {
                     continue;
                 }
 
@@ -221,4 +221,10 @@ public class HeartbeatSyncer extends 
AbstractSystemMessageSyncer {
 
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
+
+    private String buildLocalProxyId() {
+        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+        // use local address, remoting port and grpc port to build unique 
local proxy Id
+        return proxyConfig.getLocalServeAddr() + "%" + 
proxyConfig.getRemotingListenPort() + "%" + proxyConfig.getGrpcServerPort();
+    }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
index 10c6f1206..97760506f 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
@@ -36,7 +36,7 @@ public class HeartbeatSyncerData {
     private ConsumeType consumeType;
     private MessageModel messageModel;
     private ConsumeFromWhere consumeFromWhere;
-    private String connectProxyIp;
+    private String localProxyId;
     private String channelData;
 
     public HeartbeatSyncerData() {
@@ -45,7 +45,7 @@ public class HeartbeatSyncerData {
     public HeartbeatSyncerData(HeartbeatType heartbeatType, String clientId,
         LanguageCode language, int version, String group,
         ConsumeType consumeType, MessageModel messageModel,
-        ConsumeFromWhere consumeFromWhere, String connectProxyIp,
+        ConsumeFromWhere consumeFromWhere, String localProxyId,
         String channelData) {
         this.heartbeatType = heartbeatType;
         this.clientId = clientId;
@@ -55,7 +55,7 @@ public class HeartbeatSyncerData {
         this.consumeType = consumeType;
         this.messageModel = messageModel;
         this.consumeFromWhere = consumeFromWhere;
-        this.connectProxyIp = connectProxyIp;
+        this.localProxyId = localProxyId;
         this.channelData = channelData;
     }
 
@@ -140,12 +140,12 @@ public class HeartbeatSyncerData {
         this.consumeFromWhere = consumeFromWhere;
     }
 
-    public String getConnectProxyIp() {
-        return connectProxyIp;
+    public String getLocalProxyId() {
+        return localProxyId;
     }
 
-    public void setConnectProxyIp(String connectProxyIp) {
-        this.connectProxyIp = connectProxyIp;
+    public void setLocalProxyId(String localProxyId) {
+        this.localProxyId = localProxyId;
     }
 
     public String getChannelData() {
@@ -169,7 +169,7 @@ public class HeartbeatSyncerData {
             .add("consumeType", consumeType)
             .add("messageModel", messageModel)
             .add("consumeFromWhere", consumeFromWhere)
-            .add("connectProxyIp", connectProxyIp)
+            .add("connectProxyIp", localProxyId)
             .add("channelData", channelData)
             .toString();
     }
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
index 95152186d..df98f31dc 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -185,7 +185,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
 
         String localServeAddr = 
ConfigurationManager.getProxyConfig().getLocalServeAddr();
         // change local serve addr, to simulate other proxy receive messages
-        
ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+        heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
         ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = 
ArgumentCaptor.forClass(ClientChannelInfo.class);
         doReturn(true).when(consumerManager).registerConsumer(anyString(), 
syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), 
anyBoolean());
 
@@ -207,7 +207,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
         doNothing().when(consumerManager).unregisterConsumer(anyString(), 
syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean());
 
         // change local serve addr, to simulate other proxy receive messages
-        
ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+        heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
         
heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getAllValues().get(1))),
 null);
         assertSame(channelInfoList.get(0).getChannel(), 
syncUnRegisterChannelInfoArgumentCaptor.getValue().getChannel());
     }
@@ -248,7 +248,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
 
         String localServeAddr = 
ConfigurationManager.getProxyConfig().getLocalServeAddr();
         // change local serve addr, to simulate other proxy receive messages
-        
ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+        heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
         ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = 
ArgumentCaptor.forClass(ClientChannelInfo.class);
         doReturn(true).when(consumerManager).registerConsumer(anyString(), 
syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), 
anyBoolean());
 
@@ -270,7 +270,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
         doNothing().when(consumerManager).unregisterConsumer(anyString(), 
syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean());
 
         // change local serve addr, to simulate other proxy receive messages
-        
ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10));
+        heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
         
heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getAllValues().get(1))),
 null);
         assertSame(channelInfoList.get(0).getChannel(), 
syncUnRegisterChannelInfoArgumentCaptor.getValue().getChannel());
     }

Reply via email to