diegosalvi commented on a change in pull request #8686:
URL: https://github.com/apache/pulsar/pull/8686#discussion_r530195356



##########
File path: 
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
##########
@@ -143,10 +150,56 @@ protected void initChannel(SocketChannel ch) throws 
Exception {
                 inboundOutboundChannelMap.put(outboundChannel.id() , 
inboundChannel.id());
             }
 
-
+            if (config.isHaProxyProtocolEnabled()) {
+                if (proxyConnection.hasHAProxyMessage()) {
+                    
outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()));
+                } else {
+                    if (inboundChannel.remoteAddress() instanceof 
InetSocketAddress) {
+                        InetSocketAddress clientAddress = (InetSocketAddress) 
inboundChannel.remoteAddress();
+                        String sourceAddress = 
clientAddress.getAddress().getHostAddress();
+                        int sourcePort = clientAddress.getPort();
+                        if (outboundChannel.localAddress() instanceof 
InetSocketAddress) {
+                            InetSocketAddress proxyAddress = 
(InetSocketAddress) inboundChannel.remoteAddress();
+                            String destinationAddress = 
proxyAddress.getAddress().getHostAddress();
+                            int destinationPort = proxyAddress.getPort();
+                            HAProxyMessage msg = new 
HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY,
+                                    HAProxyProxiedProtocol.TCP4, 
sourceAddress, destinationAddress, sourcePort, destinationPort);
+                            
outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg));
+                            msg.release();
+                        }
+                    }
+                }
+            }
         });
     }
 
+    private ByteBuf encodeProxyProtocolMessage(HAProxyMessage msg) {
+        // Max length of v1 version proxy protocol message is 108
+        ByteBuf out = Unpooled.buffer(108);

Review comment:
       Actually maximum record size is 107 by spec:
     - worst case (optional fields set to 0xff) :```
   
         "PROXY UNKNOWN ffff:f...f:ffff ffff:f...f:ffff 65535 65535\r\n"
       => 5 + 1 + 7 + 1 + 39 + 1 + 39 + 1 + 5 + 1 + 5 + 2 = 107 chars
   
   [...]
   
   The receiver must wait for the CRLF sequence before starting to decode the
   addresses in order to ensure they are complete and properly parsed. If the 
CRLF
   sequence is not found in the first 107 characters, the receiver should 
declare
   the line invalid.
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
##########
@@ -118,6 +119,9 @@ protected void initChannel(SocketChannel ch) throws 
Exception {
             ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
         }
 
+        if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) {
+            ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new 
OptionalProxyProtocolDecoder());

Review comment:
       I think we should have the possibilitiy to require proxy headers instead 
of handle them as _optional_.
   Specification itself states:
   ```
   The receiver MUST be configured to only receive the protocol described in 
this
   specification and MUST not try to guess whether the protocol header is 
present
   or not. This means that the protocol explicitly prevents port sharing between
   public and private access.
   ```

##########
File path: 
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
##########
@@ -143,10 +150,56 @@ protected void initChannel(SocketChannel ch) throws 
Exception {
                 inboundOutboundChannelMap.put(outboundChannel.id() , 
inboundChannel.id());
             }
 
-
+            if (config.isHaProxyProtocolEnabled()) {
+                if (proxyConnection.hasHAProxyMessage()) {
+                    
outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()));
+                } else {
+                    if (inboundChannel.remoteAddress() instanceof 
InetSocketAddress) {
+                        InetSocketAddress clientAddress = (InetSocketAddress) 
inboundChannel.remoteAddress();
+                        String sourceAddress = 
clientAddress.getAddress().getHostAddress();
+                        int sourcePort = clientAddress.getPort();
+                        if (outboundChannel.localAddress() instanceof 
InetSocketAddress) {
+                            InetSocketAddress proxyAddress = 
(InetSocketAddress) inboundChannel.remoteAddress();
+                            String destinationAddress = 
proxyAddress.getAddress().getHostAddress();
+                            int destinationPort = proxyAddress.getPort();
+                            HAProxyMessage msg = new 
HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY,

Review comment:
       netty-haproxy can decode binary v2 version too (but doesn't implements 
encoding).
   We could add another encoding method for v2 protocol and let the configurer 
choose which use.
   
   For the encoding implementation I've done one for subethamail if you want 
take a peek. It has been done for testing purposes (so it permits some strange 
behaviour to check various edge cases and "wrong" headers and can surely be 
simplified): 
https://github.com/davidmoten/subethasmtp/blob/master/src/test/java/org/subethamail/smtp/internal/proxy/ProxyProtocolV2HandlerTest.java




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to