This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 95299c05e9 [ISSUE #6268] fix rocketmq-proxy does not work properly in
k8s nodePort mode (#6262)
95299c05e9 is described below
commit 95299c05e9262f4480a95b508d506a014493f38f
Author: Misaki <[email protected]>
AuthorDate: Tue Mar 14 17:21:05 2023 +0800
[ISSUE #6268] fix rocketmq-proxy does not work properly in k8s nodePort
mode (#6262)
* fix rocketmq-proxy does not work properly in k8s nodePort mode
* Add configuration to support k8s nodePort mode
* rename forceUseEndpointPort to useEndpointPortFromRequest
---------
Co-authored-by: Misaki <Wsywsy22@[email protected]>
---
.../org/apache/rocketmq/proxy/config/ProxyConfig.java | 9 +++++++++
.../rocketmq/proxy/grpc/v2/route/RouteActivity.java | 16 ++++++++++++----
2 files changed, 21 insertions(+), 4 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 1de7a1ebf1..b6d890aa01 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -148,6 +148,7 @@ public class ProxyConfig implements ConfigFile {
private int consumerProcessorThreadPoolNums = PROCESSOR_NUMBER;
private int consumerProcessorThreadPoolQueueCapacity = 10000;
+ private boolean useEndpointPortFromRequest = false;
private int topicRouteServiceCacheExpiredInSeconds = 20;
private int topicRouteServiceCacheMaxNum = 20000;
private int topicRouteServiceThreadPoolNums = PROCESSOR_NUMBER;
@@ -425,6 +426,14 @@ public class ProxyConfig implements ConfigFile {
this.grpcServerPort = grpcServerPort;
}
+ public boolean isUseEndpointPortFromRequest() {
+ return useEndpointPortFromRequest;
+ }
+
+ public void setUseEndpointPortFromRequest(boolean
useEndpointPortFromRequest) {
+ this.useEndpointPortFromRequest = useEndpointPortFromRequest;
+ }
+
public boolean isTlsTestModeEnable() {
return tlsTestModeEnable;
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
index 9983fed44e..29b9034a42 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
@@ -162,15 +162,22 @@ public class RouteActivity extends
AbstractMessingActivity {
}
protected List<org.apache.rocketmq.proxy.common.Address>
convertToAddressList(Endpoints endpoints) {
- int port = ConfigurationManager.getProxyConfig().getGrpcServerPort();
+
+ boolean useEndpointPort =
ConfigurationManager.getProxyConfig().isUseEndpointPortFromRequest();
+
List<org.apache.rocketmq.proxy.common.Address> addressList = new
ArrayList<>();
for (Address address : endpoints.getAddressesList()) {
+ int port =
ConfigurationManager.getProxyConfig().getGrpcServerPort();
+ if (useEndpointPort) {
+ port = address.getPort();
+ }
addressList.add(new org.apache.rocketmq.proxy.common.Address(
org.apache.rocketmq.proxy.common.Address.AddressScheme.valueOf(endpoints.getScheme().name()),
- HostAndPort.fromParts(address.getHost(), port))
- );
+ HostAndPort.fromParts(address.getHost(), port)));
}
+
return addressList;
+
}
protected Map<String /*brokerName*/, Map<Long /*brokerID*/, Broker>>
buildBrokerMap(
@@ -207,7 +214,8 @@ public class RouteActivity extends AbstractMessingActivity {
return brokerMap;
}
- protected List<MessageQueue> genMessageQueueFromQueueData(QueueData
queueData, Resource topic, TopicMessageType topicMessageType, Broker broker) {
+ protected List<MessageQueue> genMessageQueueFromQueueData(QueueData
queueData, Resource topic,
+ TopicMessageType topicMessageType, Broker broker) {
List<MessageQueue> messageQueueList = new ArrayList<>();
int r = 0;