This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new a33f1d1c72 [ISSUE #7875] Add constructor for ProxyTopicRouteData
(#7876)
a33f1d1c72 is described below
commit a33f1d1c72a785744f586ce2e92a45f25dd5ebd5
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Thu Feb 29 17:25:02 2024 +0800
[ISSUE #7875] Add constructor for ProxyTopicRouteData (#7876)
---
.../service/route/ClusterTopicRouteService.java | 19 +-------
.../service/route/LocalTopicRouteService.java | 23 +--------
.../proxy/service/route/ProxyTopicRouteData.java | 56 ++++++++++++++++++++++
3 files changed, 60 insertions(+), 38 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java
index 84252f8b8e..a4df98971c 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java
@@ -17,11 +17,10 @@
package org.apache.rocketmq.proxy.service.route;
import java.util.List;
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.proxy.common.Address;
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
public class ClusterTopicRouteService extends TopicRouteService {
@@ -39,21 +38,7 @@ public class ClusterTopicRouteService extends
TopicRouteService {
public ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx,
List<Address> requestHostAndPortList,
String topicName) throws Exception {
TopicRouteData topicRouteData = getAllMessageQueueView(ctx,
topicName).getTopicRouteData();
-
- ProxyTopicRouteData proxyTopicRouteData = new ProxyTopicRouteData();
- proxyTopicRouteData.setQueueDatas(topicRouteData.getQueueDatas());
-
- for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
- ProxyTopicRouteData.ProxyBrokerData proxyBrokerData = new
ProxyTopicRouteData.ProxyBrokerData();
- proxyBrokerData.setCluster(brokerData.getCluster());
- proxyBrokerData.setBrokerName(brokerData.getBrokerName());
- for (Long brokerId : brokerData.getBrokerAddrs().keySet()) {
- proxyBrokerData.getBrokerAddrs().put(brokerId,
requestHostAndPortList);
- }
- proxyTopicRouteData.getBrokerDatas().add(proxyBrokerData);
- }
-
- return proxyTopicRouteData;
+ return new ProxyTopicRouteData(topicRouteData, requestHostAndPortList);
}
@Override
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java
index aced15cee5..f2a42c0aed 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java
@@ -17,10 +17,10 @@
package org.apache.rocketmq.proxy.service.route;
import com.google.common.collect.Lists;
-import com.google.common.net.HostAndPort;
import java.util.HashMap;
import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
@@ -28,7 +28,6 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.proxy.common.Address;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
@@ -62,25 +61,7 @@ public class LocalTopicRouteService extends
TopicRouteService {
String topicName) throws Exception {
MessageQueueView messageQueueView = getAllMessageQueueView(ctx,
topicName);
TopicRouteData topicRouteData = messageQueueView.getTopicRouteData();
-
- ProxyTopicRouteData proxyTopicRouteData = new ProxyTopicRouteData();
- proxyTopicRouteData.setQueueDatas(topicRouteData.getQueueDatas());
-
- for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
- ProxyTopicRouteData.ProxyBrokerData proxyBrokerData = new
ProxyTopicRouteData.ProxyBrokerData();
- proxyBrokerData.setCluster(brokerData.getCluster());
- proxyBrokerData.setBrokerName(brokerData.getBrokerName());
- for (Long brokerId : brokerData.getBrokerAddrs().keySet()) {
- String brokerAddr = brokerData.getBrokerAddrs().get(brokerId);
- HostAndPort brokerHostAndPort =
HostAndPort.fromString(brokerAddr);
- HostAndPort grpcHostAndPort =
HostAndPort.fromParts(brokerHostAndPort.getHost(), grpcPort);
-
- proxyBrokerData.getBrokerAddrs().put(brokerId,
Lists.newArrayList(new Address(Address.AddressScheme.IPv4, grpcHostAndPort)));
- }
- proxyTopicRouteData.getBrokerDatas().add(proxyBrokerData);
- }
-
- return proxyTopicRouteData;
+ return new ProxyTopicRouteData(topicRouteData, grpcPort);
}
@Override
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ProxyTopicRouteData.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ProxyTopicRouteData.java
index da8b3f6112..63651f6fe8 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ProxyTopicRouteData.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ProxyTopicRouteData.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.proxy.service.route;
+import com.google.common.collect.Lists;
+import com.google.common.net.HostAndPort;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -27,6 +29,60 @@ import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
public class ProxyTopicRouteData {
+ public ProxyTopicRouteData() {
+ }
+
+ public ProxyTopicRouteData(TopicRouteData topicRouteData) {
+ this.queueDatas = topicRouteData.getQueueDatas();
+ this.brokerDatas = new ArrayList<>();
+
+ for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
+ ProxyTopicRouteData.ProxyBrokerData proxyBrokerData = new
ProxyTopicRouteData.ProxyBrokerData();
+ proxyBrokerData.setCluster(brokerData.getCluster());
+ proxyBrokerData.setBrokerName(brokerData.getBrokerName());
+ for (Long brokerId : brokerData.getBrokerAddrs().keySet()) {
+ String brokerAddr = brokerData.getBrokerAddrs().get(brokerId);
+ HostAndPort hostAndPort = HostAndPort.fromString(brokerAddr);
+
+ proxyBrokerData.getBrokerAddrs().put(brokerId,
Lists.newArrayList(new Address(Address.AddressScheme.IPv4, hostAndPort)));
+ }
+ this.brokerDatas.add(proxyBrokerData);
+ }
+ }
+
+ public ProxyTopicRouteData(TopicRouteData topicRouteData, int port) {
+ this.queueDatas = topicRouteData.getQueueDatas();
+ this.brokerDatas = new ArrayList<>();
+
+ for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
+ ProxyTopicRouteData.ProxyBrokerData proxyBrokerData = new
ProxyTopicRouteData.ProxyBrokerData();
+ proxyBrokerData.setCluster(brokerData.getCluster());
+ proxyBrokerData.setBrokerName(brokerData.getBrokerName());
+ for (Long brokerId : brokerData.getBrokerAddrs().keySet()) {
+ String brokerAddr = brokerData.getBrokerAddrs().get(brokerId);
+ HostAndPort brokerHostAndPort =
HostAndPort.fromString(brokerAddr);
+ HostAndPort hostAndPort =
HostAndPort.fromParts(brokerHostAndPort.getHost(), port);
+
+ proxyBrokerData.getBrokerAddrs().put(brokerId,
Lists.newArrayList(new Address(Address.AddressScheme.IPv4, hostAndPort)));
+ }
+ this.brokerDatas.add(proxyBrokerData);
+ }
+ }
+
+ public ProxyTopicRouteData(TopicRouteData topicRouteData, List<Address>
requestHostAndPortList) {
+ this.queueDatas = topicRouteData.getQueueDatas();
+ this.brokerDatas = new ArrayList<>();
+
+ for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
+ ProxyTopicRouteData.ProxyBrokerData proxyBrokerData = new
ProxyTopicRouteData.ProxyBrokerData();
+ proxyBrokerData.setCluster(brokerData.getCluster());
+ proxyBrokerData.setBrokerName(brokerData.getBrokerName());
+ for (Long brokerId : brokerData.getBrokerAddrs().keySet()) {
+ proxyBrokerData.getBrokerAddrs().put(brokerId,
requestHostAndPortList);
+ }
+ this.brokerDatas.add(proxyBrokerData);
+ }
+ }
public static class ProxyBrokerData {
private String cluster;