This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 95299c05e9 [ISSUE #6268] fix rocketmq-proxy does not work properly in 
k8s nodePort mode (#6262)
95299c05e9 is described below

commit 95299c05e9262f4480a95b508d506a014493f38f
Author: Misaki <[email protected]>
AuthorDate: Tue Mar 14 17:21:05 2023 +0800

    [ISSUE #6268] fix rocketmq-proxy does not work properly in k8s nodePort 
mode (#6262)
    
    * fix rocketmq-proxy does not work properly in k8s nodePort mode
    
    * Add configuration to support k8s nodePort mode
    
    * rename forceUseEndpointPort to useEndpointPortFromRequest
    
    ---------
    
    Co-authored-by: Misaki <Wsywsy22@[email protected]>
---
 .../org/apache/rocketmq/proxy/config/ProxyConfig.java    |  9 +++++++++
 .../rocketmq/proxy/grpc/v2/route/RouteActivity.java      | 16 ++++++++++++----
 2 files changed, 21 insertions(+), 4 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 1de7a1ebf1..b6d890aa01 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -148,6 +148,7 @@ public class ProxyConfig implements ConfigFile {
     private int consumerProcessorThreadPoolNums = PROCESSOR_NUMBER;
     private int consumerProcessorThreadPoolQueueCapacity = 10000;
 
+    private boolean useEndpointPortFromRequest = false;
     private int topicRouteServiceCacheExpiredInSeconds = 20;
     private int topicRouteServiceCacheMaxNum = 20000;
     private int topicRouteServiceThreadPoolNums = PROCESSOR_NUMBER;
@@ -425,6 +426,14 @@ public class ProxyConfig implements ConfigFile {
         this.grpcServerPort = grpcServerPort;
     }
 
+    public boolean isUseEndpointPortFromRequest() {
+        return useEndpointPortFromRequest;
+    }
+
+    public void setUseEndpointPortFromRequest(boolean 
useEndpointPortFromRequest) {
+        this.useEndpointPortFromRequest = useEndpointPortFromRequest;
+    }
+
     public boolean isTlsTestModeEnable() {
         return tlsTestModeEnable;
     }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
index 9983fed44e..29b9034a42 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
@@ -162,15 +162,22 @@ public class RouteActivity extends 
AbstractMessingActivity {
     }
 
     protected List<org.apache.rocketmq.proxy.common.Address> 
convertToAddressList(Endpoints endpoints) {
-        int port = ConfigurationManager.getProxyConfig().getGrpcServerPort();
+
+        boolean useEndpointPort = 
ConfigurationManager.getProxyConfig().isUseEndpointPortFromRequest();
+
         List<org.apache.rocketmq.proxy.common.Address> addressList = new 
ArrayList<>();
         for (Address address : endpoints.getAddressesList()) {
+            int port = 
ConfigurationManager.getProxyConfig().getGrpcServerPort();
+            if (useEndpointPort) {
+                port = address.getPort();
+            }
             addressList.add(new org.apache.rocketmq.proxy.common.Address(
                 
org.apache.rocketmq.proxy.common.Address.AddressScheme.valueOf(endpoints.getScheme().name()),
-                HostAndPort.fromParts(address.getHost(), port))
-            );
+                HostAndPort.fromParts(address.getHost(), port)));
         }
+
         return addressList;
+
     }
 
     protected Map<String /*brokerName*/, Map<Long /*brokerID*/, Broker>> 
buildBrokerMap(
@@ -207,7 +214,8 @@ public class RouteActivity extends AbstractMessingActivity {
         return brokerMap;
     }
 
-    protected List<MessageQueue> genMessageQueueFromQueueData(QueueData 
queueData, Resource topic, TopicMessageType topicMessageType, Broker broker) {
+    protected List<MessageQueue> genMessageQueueFromQueueData(QueueData 
queueData, Resource topic,
+        TopicMessageType topicMessageType, Broker broker) {
         List<MessageQueue> messageQueueList = new ArrayList<>();
 
         int r = 0;

Reply via email to