This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1b63e8e198 [ISSUE #7945] Make HAProxyMessageForwarder Scalable (#7946)
1b63e8e198 is described below

commit 1b63e8e1981501f5cad761a4474cfc719052fac2
Author: dingshuangxi888 <[email protected]>
AuthorDate: Tue Mar 19 10:34:27 2024 +0800

    [ISSUE #7945] Make HAProxyMessageForwarder Scalable (#7946)
---
 .../http2proxy/HAProxyMessageForwarder.java        | 33 ++++++++++++++++------
 1 file changed, 25 insertions(+), 8 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
index 6764dbf03b..39d7057bdd 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
@@ -89,8 +89,6 @@ public class HAProxyMessageForwarder extends 
ChannelInboundHandlerAdapter {
     protected HAProxyMessage buildHAProxyMessage(Channel inboundChannel) 
throws IllegalAccessException, DecoderException {
         String sourceAddress = null, destinationAddress = null;
         int sourcePort = 0, destinationPort = 0;
-        List<HAProxyTLV> haProxyTLVs = new ArrayList<>();
-
         if (inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
             Attribute<?>[] attributes = (Attribute<?>[]) 
FieldUtils.readField(FIELD_ATTRIBUTE, inboundChannel);
             if (ArrayUtils.isEmpty(attributes)) {
@@ -117,12 +115,6 @@ public class HAProxyMessageForwarder extends 
ChannelInboundHandlerAdapter {
                 if (attribute.key() == 
AttributeKeys.PROXY_PROTOCOL_SERVER_PORT) {
                     destinationPort = Integer.parseInt(attributeValue);
                 }
-                if (StringUtils.startsWith(attributeKey, 
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX)) {
-                    HAProxyTLV haProxyTLV = buildHAProxyTLV(attributeKey, 
attributeValue);
-                    if (haProxyTLV != null) {
-                        haProxyTLVs.add(haProxyTLV);
-                    }
-                }
             }
         } else {
             String remoteAddr = 
RemotingHelper.parseChannelRemoteAddr(inboundChannel);
@@ -137,10 +129,35 @@ public class HAProxyMessageForwarder extends 
ChannelInboundHandlerAdapter {
         HAProxyProxiedProtocol proxiedProtocol = 
AclUtils.isColon(sourceAddress) ? HAProxyProxiedProtocol.TCP6 :
             HAProxyProxiedProtocol.TCP4;
 
+        List<HAProxyTLV> haProxyTLVs = buildHAProxyTLV(inboundChannel);
+
         return new HAProxyMessage(HAProxyProtocolVersion.V2, 
HAProxyCommand.PROXY,
             proxiedProtocol, sourceAddress, destinationAddress, sourcePort, 
destinationPort, haProxyTLVs);
     }
 
+    protected List<HAProxyTLV> buildHAProxyTLV(Channel inboundChannel) throws 
IllegalAccessException, DecoderException {
+        List<HAProxyTLV> result = new ArrayList<>();
+        if (!inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
+            return result;
+        }
+        Attribute<?>[] attributes = (Attribute<?>[]) 
FieldUtils.readField(FIELD_ATTRIBUTE, inboundChannel);
+        if (ArrayUtils.isEmpty(attributes)) {
+            return result;
+        }
+        for (Attribute<?> attribute : attributes) {
+            String attributeKey = attribute.key().name();
+            if (!StringUtils.startsWith(attributeKey, 
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX)) {
+                continue;
+            }
+            String attributeValue = (String) attribute.get();
+            HAProxyTLV haProxyTLV = buildHAProxyTLV(attributeKey, 
attributeValue);
+            if (haProxyTLV != null) {
+                result.add(haProxyTLV);
+            }
+        }
+        return result;
+    }
+
     protected HAProxyTLV buildHAProxyTLV(String attributeKey, String 
attributeValue) throws DecoderException {
         String typeString = StringUtils.substringAfter(attributeKey, 
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX);
         ByteBuf byteBuf = Unpooled.buffer();

Reply via email to