This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 23c8efd801e60d91f1a2ab0a7d74a835c56efbeb Author: zhouxiang <[email protected]> AuthorDate: Mon Nov 28 15:54:06 2022 +0800 [ISSUE #5485] Fix GrpcBaseIT --- .../apache/rocketmq/proxy/config/ProxyConfig.java | 26 +++++++++++++++------- .../sysmessage/AbstractSystemMessageSyncer.java | 16 ++++--------- .../apache/rocketmq/test/grpc/v2/GrpcBaseIT.java | 1 + 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index e0f971202..4d5084cfd 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -54,16 +54,18 @@ public class ProxyConfig implements ConfigFile { } } - private String rocketMQClusterName = ""; + private String rocketMQClusterName = DEFAULT_CLUSTER_NAME; private String proxyClusterName = DEFAULT_CLUSTER_NAME; private String proxyName = StringUtils.isEmpty(localHostName) ? "DEFAULT_PROXY" : localHostName; private String localServeAddr = ""; - private String systemTopicClusterName = ""; + private String heartbeatSyncerTopicClusterName = ""; private int heartbeatSyncerThreadPoolNums = 4; private int heartbeatSyncerThreadPoolQueueCapacity = 100; + private String heartbeatSyncerTopicName = "DefaultHeartBeatSyncerTopic"; + /** * configuration for ThreadPoolMonitor */ @@ -250,8 +252,8 @@ public class ProxyConfig implements ConfigFile { if (StringUtils.isBlank(remotingAccessAddr)) { this.remotingAccessAddr = this.localServeAddr; } - if (StringUtils.isBlank(systemTopicClusterName)) { - this.systemTopicClusterName = this.rocketMQClusterName; + if (StringUtils.isBlank(heartbeatSyncerTopicClusterName)) { + this.heartbeatSyncerTopicClusterName = this.rocketMQClusterName; } } @@ -324,12 +326,12 @@ public class ProxyConfig implements ConfigFile { this.localServeAddr = localServeAddr; } - public String getSystemTopicClusterName() { - return systemTopicClusterName; + public String getHeartbeatSyncerTopicClusterName() { + return heartbeatSyncerTopicClusterName; } - public void setSystemTopicClusterName(String systemTopicClusterName) { - this.systemTopicClusterName = systemTopicClusterName; + public void setHeartbeatSyncerTopicClusterName(String heartbeatSyncerTopicClusterName) { + this.heartbeatSyncerTopicClusterName = heartbeatSyncerTopicClusterName; } public int getHeartbeatSyncerThreadPoolNums() { @@ -348,6 +350,14 @@ public class ProxyConfig implements ConfigFile { this.heartbeatSyncerThreadPoolQueueCapacity = heartbeatSyncerThreadPoolQueueCapacity; } + public String getHeartbeatSyncerTopicName() { + return heartbeatSyncerTopicName; + } + + public void setHeartbeatSyncerTopicName(String heartbeatSyncerTopicName) { + this.heartbeatSyncerTopicName = heartbeatSyncerTopicName; + } + public boolean isEnablePrintJstack() { return enablePrintJstack; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java index e0a9fd702..d08d5dfb1 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java @@ -49,7 +49,6 @@ public abstract class AbstractSystemMessageSyncer implements StartAndShutdown, M protected static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); protected final TopicRouteService topicRouteService; protected final AdminService adminService; - protected final String systemResourceName; protected final MQClientAPIFactory mqClientAPIFactory; protected DefaultMQPushConsumer defaultMQPushConsumer; @@ -57,25 +56,18 @@ public abstract class AbstractSystemMessageSyncer implements StartAndShutdown, M this.topicRouteService = topicRouteService; this.adminService = adminService; this.mqClientAPIFactory = mqClientAPIFactory; - - this.systemResourceName = this.getSystemResourceName(); - } - - protected String getSystemResourceName() { - ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); - return TopicValidator.SYSTEM_TOPIC_PREFIX + "proxy_" + this.getClass().getSimpleName() + "_" + proxyConfig.getProxyClusterName(); } protected String getSystemMessageProducerId() { - return "PID_" + this.systemResourceName; + return "PID_" + getBroadcastTopicName(); } protected String getSystemMessageConsumerId() { - return "CID_" + this.systemResourceName; + return "CID_" + getBroadcastTopicName(); } protected String getBroadcastTopicName() { - return this.systemResourceName; + return ConfigurationManager.getProxyConfig().getHeartbeatSyncerTopicName(); } protected String getSubTag() { @@ -84,7 +76,7 @@ public abstract class AbstractSystemMessageSyncer implements StartAndShutdown, M protected String getBroadcastTopicClusterName() { ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); - return proxyConfig.getSystemTopicClusterName(); + return proxyConfig.getHeartbeatSyncerTopicClusterName(); } protected int getBroadcastTopicQueueNum() { diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java index 3fb955a0c..95810b97c 100644 --- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java @@ -156,6 +156,7 @@ public class GrpcBaseIT extends BaseConf { // Set LongPollingReserveTimeInMillis to 500ms to reserve more time for IT ConfigurationManager.getProxyConfig().setLongPollingReserveTimeInMillis(500); ConfigurationManager.getProxyConfig().setRocketMQClusterName(brokerController1.getBrokerConfig().getBrokerClusterName()); + ConfigurationManager.getProxyConfig().setHeartbeatSyncerTopicClusterName(brokerController1.getBrokerConfig().getBrokerClusterName()); ConfigurationManager.getProxyConfig().setMinInvisibleTimeMillsForRecv(3); }
