This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 1b63e8e198 [ISSUE #7945] Make HAProxyMessageForwarder Scalable (#7946)
1b63e8e198 is described below
commit 1b63e8e1981501f5cad761a4474cfc719052fac2
Author: dingshuangxi888 <[email protected]>
AuthorDate: Tue Mar 19 10:34:27 2024 +0800
[ISSUE #7945] Make HAProxyMessageForwarder Scalable (#7946)
---
.../http2proxy/HAProxyMessageForwarder.java | 33 ++++++++++++++++------
1 file changed, 25 insertions(+), 8 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
index 6764dbf03b..39d7057bdd 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
@@ -89,8 +89,6 @@ public class HAProxyMessageForwarder extends
ChannelInboundHandlerAdapter {
protected HAProxyMessage buildHAProxyMessage(Channel inboundChannel)
throws IllegalAccessException, DecoderException {
String sourceAddress = null, destinationAddress = null;
int sourcePort = 0, destinationPort = 0;
- List<HAProxyTLV> haProxyTLVs = new ArrayList<>();
-
if (inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
Attribute<?>[] attributes = (Attribute<?>[])
FieldUtils.readField(FIELD_ATTRIBUTE, inboundChannel);
if (ArrayUtils.isEmpty(attributes)) {
@@ -117,12 +115,6 @@ public class HAProxyMessageForwarder extends
ChannelInboundHandlerAdapter {
if (attribute.key() ==
AttributeKeys.PROXY_PROTOCOL_SERVER_PORT) {
destinationPort = Integer.parseInt(attributeValue);
}
- if (StringUtils.startsWith(attributeKey,
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX)) {
- HAProxyTLV haProxyTLV = buildHAProxyTLV(attributeKey,
attributeValue);
- if (haProxyTLV != null) {
- haProxyTLVs.add(haProxyTLV);
- }
- }
}
} else {
String remoteAddr =
RemotingHelper.parseChannelRemoteAddr(inboundChannel);
@@ -137,10 +129,35 @@ public class HAProxyMessageForwarder extends
ChannelInboundHandlerAdapter {
HAProxyProxiedProtocol proxiedProtocol =
AclUtils.isColon(sourceAddress) ? HAProxyProxiedProtocol.TCP6 :
HAProxyProxiedProtocol.TCP4;
+ List<HAProxyTLV> haProxyTLVs = buildHAProxyTLV(inboundChannel);
+
return new HAProxyMessage(HAProxyProtocolVersion.V2,
HAProxyCommand.PROXY,
proxiedProtocol, sourceAddress, destinationAddress, sourcePort,
destinationPort, haProxyTLVs);
}
+ protected List<HAProxyTLV> buildHAProxyTLV(Channel inboundChannel) throws
IllegalAccessException, DecoderException {
+ List<HAProxyTLV> result = new ArrayList<>();
+ if (!inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
+ return result;
+ }
+ Attribute<?>[] attributes = (Attribute<?>[])
FieldUtils.readField(FIELD_ATTRIBUTE, inboundChannel);
+ if (ArrayUtils.isEmpty(attributes)) {
+ return result;
+ }
+ for (Attribute<?> attribute : attributes) {
+ String attributeKey = attribute.key().name();
+ if (!StringUtils.startsWith(attributeKey,
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX)) {
+ continue;
+ }
+ String attributeValue = (String) attribute.get();
+ HAProxyTLV haProxyTLV = buildHAProxyTLV(attributeKey,
attributeValue);
+ if (haProxyTLV != null) {
+ result.add(haProxyTLV);
+ }
+ }
+ return result;
+ }
+
protected HAProxyTLV buildHAProxyTLV(String attributeKey, String
attributeValue) throws DecoderException {
String typeString = StringUtils.substringAfter(attributeKey,
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX);
ByteBuf byteBuf = Unpooled.buffer();