This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new aea5811df [ISSUE #6884] Resolve proxy sending mentality to broker and
unable to find ACL configuration related (#6885)
aea5811df is described below
commit aea5811df007c2abf2d46eea931e4c867514e0eb
Author: 城南少年与猫 <[email protected]>
AuthorDate: Thu Jun 15 10:00:45 2023 +0800
[ISSUE #6884] Resolve proxy sending mentality to broker and unable to find
ACL configuration related (#6885)
Co-authored-by: fengbaichao <[email protected]>
---
.../org/apache/rocketmq/proxy/service/ClusterServiceManager.java | 2 +-
.../rocketmq/proxy/service/client/ClusterConsumerManager.java | 5 +++--
.../proxy/service/sysmessage/AbstractSystemMessageSyncer.java | 8 +++++---
.../apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java | 5 +++--
.../rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java | 4 ++--
5 files changed, 14 insertions(+), 10 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
index 95cc4d149..d2ddfc352 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
@@ -97,7 +97,7 @@ public class ClusterServiceManager extends
AbstractStartAndShutdown implements S
this.adminService = new
DefaultAdminService(this.operationClientAPIFactory);
this.producerManager = new ProducerManager();
- this.consumerManager = new
ClusterConsumerManager(this.topicRouteService, this.adminService,
this.operationClientAPIFactory, new ConsumerIdsChangeListenerImpl(),
proxyConfig.getChannelExpiredTimeout());
+ this.consumerManager = new
ClusterConsumerManager(this.topicRouteService, this.adminService,
this.operationClientAPIFactory, new ConsumerIdsChangeListenerImpl(),
proxyConfig.getChannelExpiredTimeout(), rpcHook);
this.transactionClientAPIFactory = new MQClientAPIFactory(
nameserverAccessConfig,
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
index 07aeb23fc..65a4569f8 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
@@ -27,6 +27,7 @@ import org.apache.rocketmq.proxy.service.admin.AdminService;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
import org.apache.rocketmq.proxy.service.sysmessage.HeartbeatSyncer;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -36,9 +37,9 @@ public class ClusterConsumerManager extends ConsumerManager
implements StartAndS
protected HeartbeatSyncer heartbeatSyncer;
public ClusterConsumerManager(TopicRouteService topicRouteService,
AdminService adminService,
- MQClientAPIFactory mqClientAPIFactory, ConsumerIdsChangeListener
consumerIdsChangeListener, long channelExpiredTimeout) {
+ MQClientAPIFactory mqClientAPIFactory,
ConsumerIdsChangeListener consumerIdsChangeListener, long
channelExpiredTimeout, RPCHook rpcHook) {
super(consumerIdsChangeListener, channelExpiredTimeout);
- this.heartbeatSyncer = new HeartbeatSyncer(topicRouteService,
adminService, this, mqClientAPIFactory);
+ this.heartbeatSyncer = new HeartbeatSyncer(topicRouteService,
adminService, this, mqClientAPIFactory, rpcHook);
}
@Override
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
index 2ef849737..fcdc25cac 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
@@ -51,12 +51,14 @@ public abstract class AbstractSystemMessageSyncer
implements StartAndShutdown, M
protected final TopicRouteService topicRouteService;
protected final AdminService adminService;
protected final MQClientAPIFactory mqClientAPIFactory;
+ protected final RPCHook rpcHook;
protected DefaultMQPushConsumer defaultMQPushConsumer;
- public AbstractSystemMessageSyncer(TopicRouteService topicRouteService,
AdminService adminService, MQClientAPIFactory mqClientAPIFactory) {
+ public AbstractSystemMessageSyncer(TopicRouteService topicRouteService,
AdminService adminService, MQClientAPIFactory mqClientAPIFactory, RPCHook
rpcHook) {
this.topicRouteService = topicRouteService;
this.adminService = adminService;
this.mqClientAPIFactory = mqClientAPIFactory;
+ this.rpcHook = rpcHook;
}
protected String getSystemMessageProducerId() {
@@ -84,8 +86,8 @@ public abstract class AbstractSystemMessageSyncer implements
StartAndShutdown, M
return 1;
}
- protected RPCHook getRpcHook() {
- return null;
+ public RPCHook getRpcHook() {
+ return rpcHook;
}
protected void sendSystemMessage(Object data) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
index 3333ebd2d..f70c06b8f 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
@@ -41,6 +41,7 @@ import
org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
import org.apache.rocketmq.proxy.service.admin.AdminService;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -53,8 +54,8 @@ public class HeartbeatSyncer extends
AbstractSystemMessageSyncer {
protected String localProxyId;
public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService
adminService,
- ConsumerManager consumerManager, MQClientAPIFactory
mqClientAPIFactory) {
- super(topicRouteService, adminService, mqClientAPIFactory);
+ ConsumerManager consumerManager, MQClientAPIFactory
mqClientAPIFactory, RPCHook rpcHook) {
+ super(topicRouteService, adminService, mqClientAPIFactory, rpcHook);
this.consumerManager = consumerManager;
this.localProxyId = buildLocalProxyId();
this.init();
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
index 6373aba30..c67f4953d 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -172,7 +172,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
.build();
when(grpcClientSettingsManager.getRawClientSettings(eq(clientId))).thenReturn(settings);
- HeartbeatSyncer heartbeatSyncer = new
HeartbeatSyncer(topicRouteService, adminService, consumerManager,
mqClientAPIFactory);
+ HeartbeatSyncer heartbeatSyncer = new
HeartbeatSyncer(topicRouteService, adminService, consumerManager,
mqClientAPIFactory, null);
heartbeatSyncer.onConsumerRegister(
consumerGroup,
clientChannelInfo,
@@ -240,7 +240,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
4
);
- HeartbeatSyncer heartbeatSyncer = new
HeartbeatSyncer(topicRouteService, adminService, consumerManager,
mqClientAPIFactory);
+ HeartbeatSyncer heartbeatSyncer = new
HeartbeatSyncer(topicRouteService, adminService, consumerManager,
mqClientAPIFactory, null);
SendResult okSendResult = new SendResult();
okSendResult.setSendStatus(SendStatus.SEND_OK);
{