diegosalvi commented on a change in pull request #8686:
URL: https://github.com/apache/pulsar/pull/8686#discussion_r530195356
##########
File path:
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
##########
@@ -143,10 +150,56 @@ protected void initChannel(SocketChannel ch) throws
Exception {
inboundOutboundChannelMap.put(outboundChannel.id() ,
inboundChannel.id());
}
-
+ if (config.isHaProxyProtocolEnabled()) {
+ if (proxyConnection.hasHAProxyMessage()) {
+
outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()));
+ } else {
+ if (inboundChannel.remoteAddress() instanceof
InetSocketAddress) {
+ InetSocketAddress clientAddress = (InetSocketAddress)
inboundChannel.remoteAddress();
+ String sourceAddress =
clientAddress.getAddress().getHostAddress();
+ int sourcePort = clientAddress.getPort();
+ if (outboundChannel.localAddress() instanceof
InetSocketAddress) {
+ InetSocketAddress proxyAddress =
(InetSocketAddress) inboundChannel.remoteAddress();
+ String destinationAddress =
proxyAddress.getAddress().getHostAddress();
+ int destinationPort = proxyAddress.getPort();
+ HAProxyMessage msg = new
HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY,
+ HAProxyProxiedProtocol.TCP4,
sourceAddress, destinationAddress, sourcePort, destinationPort);
+
outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg));
+ msg.release();
+ }
+ }
+ }
+ }
});
}
+ private ByteBuf encodeProxyProtocolMessage(HAProxyMessage msg) {
+ // Max length of v1 version proxy protocol message is 108
+ ByteBuf out = Unpooled.buffer(108);
Review comment:
Actually maximum record size is 107 by spec:
- worst case (optional fields set to 0xff) :```
"PROXY UNKNOWN ffff:f...f:ffff ffff:f...f:ffff 65535 65535\r\n"
=> 5 + 1 + 7 + 1 + 39 + 1 + 39 + 1 + 5 + 1 + 5 + 2 = 107 chars
[...]
The receiver must wait for the CRLF sequence before starting to decode the
addresses in order to ensure they are complete and properly parsed. If the
CRLF
sequence is not found in the first 107 characters, the receiver should
declare
the line invalid.
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
##########
@@ -118,6 +119,9 @@ protected void initChannel(SocketChannel ch) throws
Exception {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}
+ if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) {
+ ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new
OptionalProxyProtocolDecoder());
Review comment:
I think we should have the possibilitiy to require proxy headers instead
of handle them as _optional_.
Specification itself states:
```
The receiver MUST be configured to only receive the protocol described in
this
specification and MUST not try to guess whether the protocol header is
present
or not. This means that the protocol explicitly prevents port sharing between
public and private access.
```
##########
File path:
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
##########
@@ -143,10 +150,56 @@ protected void initChannel(SocketChannel ch) throws
Exception {
inboundOutboundChannelMap.put(outboundChannel.id() ,
inboundChannel.id());
}
-
+ if (config.isHaProxyProtocolEnabled()) {
+ if (proxyConnection.hasHAProxyMessage()) {
+
outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()));
+ } else {
+ if (inboundChannel.remoteAddress() instanceof
InetSocketAddress) {
+ InetSocketAddress clientAddress = (InetSocketAddress)
inboundChannel.remoteAddress();
+ String sourceAddress =
clientAddress.getAddress().getHostAddress();
+ int sourcePort = clientAddress.getPort();
+ if (outboundChannel.localAddress() instanceof
InetSocketAddress) {
+ InetSocketAddress proxyAddress =
(InetSocketAddress) inboundChannel.remoteAddress();
+ String destinationAddress =
proxyAddress.getAddress().getHostAddress();
+ int destinationPort = proxyAddress.getPort();
+ HAProxyMessage msg = new
HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY,
Review comment:
netty-haproxy can decode binary v2 version too (but doesn't implements
encoding).
We could add another encoding method for v2 protocol and let the configurer
choose which use.
For the encoding implementation I've done one for subethamail if you want
take a peek. It has been done for testing purposes (so it permits some strange
behaviour to check various edge cases and "wrong" headers and can surely be
simplified):
https://github.com/davidmoten/subethasmtp/blob/master/src/test/java/org/subethamail/smtp/internal/proxy/ProxyProtocolV2HandlerTest.java
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]