This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 92e058c823 [ISSUE #7774] Make the handle of ppv2 tlv more extendable
(#7775)
92e058c823 is described below
commit 92e058c823d7d064834710ff470a0d61dc067074
Author: dingshuangxi888 <[email protected]>
AuthorDate: Wed Jan 24 09:50:15 2024 +0800
[ISSUE #7774] Make the handle of ppv2 tlv more extendable (#7775)
* Fix ascii validate for ppv2 tls.
* make the handle of ppv2 tlv extendable.
---
.../proxy/grpc/ProxyAndTlsProtocolNegotiator.java | 38 ++++++-------
.../remoting/MultiProtocolRemotingServer.java | 2 +-
.../http2proxy/HAProxyMessageForwarder.java | 23 +++++---
.../http2proxy/Http2ProtocolProxyHandler.java | 12 +++--
.../grpc/ProxyAndTlsProtocolNegotiatorTest.java | 49 +++++++++++++++++
.../http2proxy/HAProxyMessageForwarderTest.java | 47 ++++++++++++++++
.../http2proxy/Http2ProtocolProxyHandlerTest.java | 61 +++++++++++++++++++++
.../remoting/netty/NettyRemotingServer.java | 23 ++++----
.../remoting/netty/NettyRemotingServerTest.java | 63 ++++++++++++++++++++++
9 files changed, 276 insertions(+), 42 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
index cdf33165d7..7c92866803 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
@@ -34,6 +34,7 @@ import
io.grpc.netty.shaded.io.netty.handler.codec.ProtocolDetectionState;
import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessage;
import
io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
import
io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyTLV;
import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler;
@@ -41,7 +42,10 @@ import
io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactor
import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
import io.grpc.netty.shaded.io.netty.util.AsciiString;
import io.grpc.netty.shaded.io.netty.util.CharsetUtil;
-import java.nio.charset.StandardCharsets;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.HAProxyConstants;
@@ -55,11 +59,6 @@ import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-
public class ProxyAndTlsProtocolNegotiator implements
InternalProtocolNegotiator.ProtocolNegotiator {
protected static final Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -123,7 +122,7 @@ public class ProxyAndTlsProtocolNegotiator implements
InternalProtocolNegotiator
}
}
- private static class ProxyAndTlsProtocolHandler extends
ByteToMessageDecoder {
+ private class ProxyAndTlsProtocolHandler extends ByteToMessageDecoder {
private final GrpcHttp2ConnectionHandler grpcHandler;
@@ -156,7 +155,7 @@ public class ProxyAndTlsProtocolNegotiator implements
InternalProtocolNegotiator
}
}
- private static class HAProxyMessageHandler extends
ChannelInboundHandlerAdapter {
+ private class HAProxyMessageHandler extends ChannelInboundHandlerAdapter {
private ProtocolNegotiationEvent pne =
InternalProtocolNegotiationEvent.getDefault();
@@ -193,16 +192,7 @@ public class ProxyAndTlsProtocolNegotiator implements
InternalProtocolNegotiator
builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT,
String.valueOf(msg.destinationPort()));
}
if (CollectionUtils.isNotEmpty(msg.tlvs())) {
- msg.tlvs().forEach(tlv -> {
- Attributes.Key<String> key = AttributeKeys.valueOf(
- HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX +
String.format("%02x", tlv.typeByteValue()));
- byte[] valueBytes =
ByteBufUtil.getBytes(tlv.content());
- String value = StringUtils.trim(new String(valueBytes,
CharsetUtil.UTF_8));
- if
(!BinaryUtil.isAscii(value.getBytes(StandardCharsets.UTF_8))) {
- return;
- }
- builder.set(key, value);
- });
+ msg.tlvs().forEach(tlv -> handleHAProxyTLV(tlv, builder));
}
pne = InternalProtocolNegotiationEvent
.withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
@@ -212,7 +202,17 @@ public class ProxyAndTlsProtocolNegotiator implements
InternalProtocolNegotiator
}
}
- private static class TlsModeHandler extends ByteToMessageDecoder {
+ protected void handleHAProxyTLV(HAProxyTLV tlv, Attributes.Builder
builder) {
+ byte[] valueBytes = ByteBufUtil.getBytes(tlv.content());
+ if (!BinaryUtil.isAscii(valueBytes)) {
+ return;
+ }
+ Attributes.Key<String> key = AttributeKeys.valueOf(
+ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x",
tlv.typeByteValue()));
+ builder.set(key, new String(valueBytes, CharsetUtil.UTF_8));
+ }
+
+ private class TlsModeHandler extends ByteToMessageDecoder {
private ProtocolNegotiationEvent pne =
InternalProtocolNegotiationEvent.getDefault();
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
index 12d728fff1..d7c2820b27 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
@@ -46,7 +46,7 @@ public class MultiProtocolRemotingServer extends
NettyRemotingServer {
private final NettyServerConfig nettyServerConfig;
private final RemotingProtocolHandler remotingProtocolHandler;
- private final Http2ProtocolProxyHandler http2ProtocolProxyHandler;
+ protected Http2ProtocolProxyHandler http2ProtocolProxyHandler;
public MultiProtocolRemotingServer(NettyServerConfig nettyServerConfig,
ChannelEventListener channelEventListener) {
super(nettyServerConfig, channelEventListener);
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
index 8f139d3d9a..99cb99d530 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
@@ -29,6 +29,7 @@ import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
import io.netty.handler.codec.haproxy.HAProxyTLV;
import io.netty.util.Attribute;
import io.netty.util.DefaultAttributeMap;
+import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
@@ -50,7 +51,7 @@ public class HAProxyMessageForwarder extends
ChannelInboundHandlerAdapter {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
private static final Field FIELD_ATTRIBUTE =
- FieldUtils.getField(DefaultAttributeMap.class, "attributes", true);
+ FieldUtils.getField(DefaultAttributeMap.class, "attributes", true);
private final Channel outboundChannel;
@@ -111,19 +112,25 @@ public class HAProxyMessageForwarder extends
ChannelInboundHandlerAdapter {
destinationPort = Integer.parseInt(attributeValue);
}
if (StringUtils.startsWith(attributeKey,
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX)) {
- String typeString = StringUtils.substringAfter(attributeKey,
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX);
- ByteBuf byteBuf = Unpooled.buffer();
-
byteBuf.writeBytes(attributeValue.getBytes(Charset.defaultCharset()));
- HAProxyTLV haProxyTLV = new
HAProxyTLV(Hex.decodeHex(typeString)[0], byteBuf);
- haProxyTLVs.add(haProxyTLV);
+ HAProxyTLV haProxyTLV = buildHAProxyTLV(attributeKey,
attributeValue);
+ if (haProxyTLV != null) {
+ haProxyTLVs.add(haProxyTLV);
+ }
}
}
HAProxyProxiedProtocol proxiedProtocol =
AclUtils.isColon(sourceAddress) ? HAProxyProxiedProtocol.TCP6 :
- HAProxyProxiedProtocol.TCP4;
+ HAProxyProxiedProtocol.TCP4;
HAProxyMessage message = new HAProxyMessage(HAProxyProtocolVersion.V2,
HAProxyCommand.PROXY,
- proxiedProtocol, sourceAddress, destinationAddress,
sourcePort, destinationPort, haProxyTLVs);
+ proxiedProtocol, sourceAddress, destinationAddress, sourcePort,
destinationPort, haProxyTLVs);
outboundChannel.writeAndFlush(message).sync();
}
+
+ protected HAProxyTLV buildHAProxyTLV(String attributeKey, String
attributeValue) throws DecoderException {
+ String typeString = StringUtils.substringAfter(attributeKey,
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX);
+ ByteBuf byteBuf = Unpooled.buffer();
+ byteBuf.writeBytes(attributeValue.getBytes(Charset.defaultCharset()));
+ return new HAProxyTLV(Hex.decodeHex(typeString)[0], byteBuf);
+ }
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
index c37db92af4..7ce563b030 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
@@ -121,10 +121,7 @@ public class Http2ProtocolProxyHandler implements
ProtocolHandler {
}
final Channel outboundChannel = f.channel();
- if (inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
- ctx.pipeline().addLast(new
HAProxyMessageForwarder(outboundChannel));
-
outboundChannel.pipeline().addFirst(HAProxyMessageEncoder.INSTANCE);
- }
+ configPipeline(inboundChannel, outboundChannel);
SslHandler sslHandler = null;
if (sslContext != null) {
@@ -132,4 +129,11 @@ public class Http2ProtocolProxyHandler implements
ProtocolHandler {
}
ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel,
sslHandler));
}
+
+ protected void configPipeline(Channel inboundChannel, Channel
outboundChannel) {
+ if (inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
+ inboundChannel.pipeline().addLast(new
HAProxyMessageForwarder(outboundChannel));
+
outboundChannel.pipeline().addFirst(HAProxyMessageEncoder.INSTANCE);
+ }
+ }
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiatorTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiatorTest.java
new file mode 100644
index 0000000000..699491f03d
--- /dev/null
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiatorTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy.grpc;
+
+import io.grpc.Attributes;
+import io.grpc.netty.shaded.io.netty.buffer.ByteBuf;
+import io.grpc.netty.shaded.io.netty.buffer.Unpooled;
+import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyTLV;
+import java.nio.charset.StandardCharsets;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProxyAndTlsProtocolNegotiatorTest {
+
+ private ProxyAndTlsProtocolNegotiator negotiator;
+
+ @Before
+ public void setUp() throws Exception {
+ ConfigurationManager.intConfig();
+ ConfigurationManager.getProxyConfig().setTlsTestModeEnable(true);
+ negotiator = new ProxyAndTlsProtocolNegotiator();
+ }
+
+ @Test
+ public void handleHAProxyTLV() {
+ ByteBuf content = Unpooled.buffer();
+ content.writeBytes("xxxx".getBytes(StandardCharsets.UTF_8));
+ HAProxyTLV haProxyTLV = new HAProxyTLV((byte) 0xE1, content);
+ negotiator.handleHAProxyTLV(haProxyTLV, Attributes.newBuilder());
+ }
+}
\ No newline at end of file
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarderTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarderTest.java
new file mode 100644
index 0000000000..f57116f0da
--- /dev/null
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarderTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
+
+import io.netty.channel.Channel;
+import io.netty.handler.codec.haproxy.HAProxyTLV;
+import org.apache.commons.codec.DecoderException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class HAProxyMessageForwarderTest {
+
+ private HAProxyMessageForwarder haProxyMessageForwarder;
+
+ @Mock
+ private Channel outboundChannel;
+
+ @Before
+ public void setUp() throws Exception {
+ haProxyMessageForwarder = new HAProxyMessageForwarder(outboundChannel);
+ }
+
+ @Test
+ public void buildHAProxyTLV() throws DecoderException {
+ HAProxyTLV haProxyTLV =
haProxyMessageForwarder.buildHAProxyTLV("proxy_protocol_tlv_0xe1", "xxxx");
+ assert haProxyTLV != null;
+ assert haProxyTLV.typeByteValue() == (byte) 0xe1;
+ }
+}
\ No newline at end of file
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandlerTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandlerTest.java
new file mode 100644
index 0000000000..bf03786d34
--- /dev/null
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandlerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
+import org.apache.rocketmq.remoting.netty.AttributeKeys;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class Http2ProtocolProxyHandlerTest {
+
+ private Http2ProtocolProxyHandler http2ProtocolProxyHandler;
+ @Mock
+ private Channel inboundChannel;
+ @Mock
+ private ChannelPipeline inboundPipeline;
+ @Mock
+ private Channel outboundChannel;
+ @Mock
+ private ChannelPipeline outboundPipeline;
+
+ @Before
+ public void setUp() throws Exception {
+ http2ProtocolProxyHandler = new Http2ProtocolProxyHandler();
+ }
+
+ @Test
+ public void configPipeline() {
+
when(inboundChannel.hasAttr(eq(AttributeKeys.PROXY_PROTOCOL_ADDR))).thenReturn(true);
+ when(inboundChannel.pipeline()).thenReturn(inboundPipeline);
+
when(inboundPipeline.addLast(any(HAProxyMessageForwarder.class))).thenReturn(inboundPipeline);
+ when(outboundChannel.pipeline()).thenReturn(outboundPipeline);
+
when(outboundPipeline.addFirst(any(HAProxyMessageEncoder.class))).thenReturn(outboundPipeline);
+ http2ProtocolProxyHandler.configPipeline(inboundChannel,
outboundChannel);
+ }
+}
\ No newline at end of file
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 7213b0c24f..51f8b85009 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -44,6 +44,7 @@ import io.netty.handler.codec.ProtocolDetectionState;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import io.netty.handler.codec.haproxy.HAProxyTLV;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
@@ -55,7 +56,6 @@ import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
import java.security.cert.CertificateException;
import java.time.Duration;
import java.util.List;
@@ -761,7 +761,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
}
}
- public static class HAProxyMessageHandler extends
ChannelInboundHandlerAdapter {
+ public class HAProxyMessageHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
@@ -795,14 +795,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
}
if (CollectionUtils.isNotEmpty(msg.tlvs())) {
msg.tlvs().forEach(tlv -> {
- AttributeKey<String> key = AttributeKeys.valueOf(
- HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX +
String.format("%02x", tlv.typeByteValue()));
- byte[] valueBytes =
ByteBufUtil.getBytes(tlv.content());
- String value = StringUtils.trim(new String(valueBytes,
CharsetUtil.UTF_8));
- if
(!BinaryUtil.isAscii(value.getBytes(StandardCharsets.UTF_8))) {
- return;
- }
- channel.attr(key).set(value);
+ handleHAProxyTLV(tlv, channel);
});
}
} finally {
@@ -810,4 +803,14 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
}
}
}
+
+ protected void handleHAProxyTLV(HAProxyTLV tlv, Channel channel) {
+ byte[] valueBytes = ByteBufUtil.getBytes(tlv.content());
+ if (!BinaryUtil.isAscii(valueBytes)) {
+ return;
+ }
+ AttributeKey<String> key = AttributeKeys.valueOf(
+ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x",
tlv.typeByteValue()));
+ channel.attr(key).set(new String(valueBytes, CharsetUtil.UTF_8));
+ }
}
diff --git
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingServerTest.java
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingServerTest.java
new file mode 100644
index 0000000000..c69fcebd45
--- /dev/null
+++
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingServerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.haproxy.HAProxyTLV;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import java.nio.charset.StandardCharsets;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class NettyRemotingServerTest {
+
+ private NettyRemotingServer nettyRemotingServer;
+
+ @Mock
+ private Channel channel;
+
+ @Mock
+ private Attribute attribute;
+
+ @Before
+ public void setUp() throws Exception {
+ NettyServerConfig nettyServerConfig = new NettyServerConfig();
+ nettyRemotingServer = new NettyRemotingServer(nettyServerConfig);
+ }
+
+ @Test
+ public void handleHAProxyTLV() {
+ when(channel.attr(any(AttributeKey.class))).thenReturn(attribute);
+ doNothing().when(attribute).set(any());
+
+ ByteBuf content = Unpooled.buffer();
+ content.writeBytes("xxxx".getBytes(StandardCharsets.UTF_8));
+ HAProxyTLV haProxyTLV = new HAProxyTLV((byte) 0xE1, content);
+ nettyRemotingServer.handleHAProxyTLV(haProxyTLV, channel);
+ }
+}
\ No newline at end of file