This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fde5b47e02e628e33d745c4fcaa3301afb34a9c9
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Apr 29 14:59:32 2022 +0300

    [Proxy] Fix proxy connection leak when inbound connection closes while 
connecting is in progress (#15366)
    
    * [Proxy] Fix proxy connection leak when inbound connection closes while 
connecting is in progress
    
    * Add logging when execution is rejected
    
    (cherry picked from commit 4ae070c5b26be3bf92c007ca309bd4adbae5ce93)
---
 .../pulsar/proxy/server/DirectProxyHandler.java    |  19 ++--
 .../pulsar/proxy/server/ProxyConnection.java       | 102 ++++++++++++++++-----
 2 files changed, 92 insertions(+), 29 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 0cbceb191b1..a632f0e7372 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -80,9 +80,7 @@ public class DirectProxyHandler {
     private final ProxyService service;
     private final Runnable onHandshakeCompleteAction;
 
-    public DirectProxyHandler(ProxyService service, ProxyConnection 
proxyConnection, String brokerHostAndPort,
-                              InetSocketAddress targetBrokerAddress, int 
protocolVersion,
-                              Supplier<SslHandler> sslHandlerSupplier) {
+    public DirectProxyHandler(ProxyService service, ProxyConnection 
proxyConnection) {
         this.service = service;
         this.authentication = proxyConnection.getClientAuthentication();
         this.inboundChannel = proxyConnection.ctx().channel();
@@ -92,6 +90,10 @@ public class DirectProxyHandler {
         this.clientAuthData = proxyConnection.clientAuthData;
         this.clientAuthMethod = proxyConnection.clientAuthMethod;
         this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask;
+    }
+
+    public void connect(String brokerHostAndPort, InetSocketAddress 
targetBrokerAddress,
+                           int protocolVersion, Supplier<SslHandler> 
sslHandlerSupplier) {
         ProxyConfiguration config = service.getConfiguration();
 
         // Start the connection attempt.
@@ -208,6 +210,12 @@ public class DirectProxyHandler {
             (byte) 'Y',
     };
 
+    public void close() {
+        if (outboundChannel != null) {
+            outboundChannel.close();
+        }
+    }
+
     enum BackendState {
         Init, HandshakeCompleted
     }
@@ -344,10 +352,7 @@ public class DirectProxyHandler {
             onHandshakeCompleteAction.run();
             startDirectProxying(connected);
 
-            int maxMessageSize =
-                    connected.hasMaxMessageSize() ? 
connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
-            
inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(),
 maxMessageSize))
-                    
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            proxyConnection.brokerConnected(DirectProxyHandler.this, 
connected);
         }
 
         private void startDirectProxying(CommandConnected connected) {
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 99f8f04ea84..33dc1abd88a 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
@@ -26,10 +27,12 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.resolver.dns.DnsNameResolver;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -50,6 +53,7 @@ import 
org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.CommandConnect;
+import org.apache.pulsar.common.api.proto.CommandConnected;
 import org.apache.pulsar.common.api.proto.CommandGetSchema;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.CommandLookupTopic;
@@ -108,6 +112,9 @@ public class ProxyConnection extends PulsarHandler {
         // Follow redirects
         ProxyLookupRequests,
 
+        // Connecting to the broker
+        ProxyConnectingToBroker,
+
         // If we are proxying a connection to a specific broker, we
         // are just forwarding data between the 2 connections, without
         // looking into it
@@ -161,8 +168,8 @@ public class ProxyConnection extends PulsarHandler {
     public synchronized void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
         super.channelInactive(ctx);
 
-        if (directProxyHandler != null && directProxyHandler.outboundChannel 
!= null) {
-            directProxyHandler.outboundChannel.close();
+        if (directProxyHandler != null) {
+            directProxyHandler.close();
             directProxyHandler = null;
         }
 
@@ -183,11 +190,22 @@ public class ProxyConnection extends PulsarHandler {
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
-        state = State.Closing;
         super.exceptionCaught(ctx, cause);
-        LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, 
cause.getClass().getSimpleName(), cause.getMessage(),
+        LOG.warn("[{}] Got exception {} : Message: {} State: {}", 
remoteAddress, cause.getClass().getSimpleName(),
+                cause.getMessage(), state,
                 ClientCnx.isKnownException(cause) ? null : cause);
-        ctx.close();
+        if (state != State.Closed) {
+            state = State.Closing;
+        }
+        if (ctx.channel().isOpen()) {
+            ctx.close();
+        } else {
+            // close connection to broker if that is present
+            if (directProxyHandler != null) {
+                directProxyHandler.close();
+                directProxyHandler = null;
+            }
+        }
     }
 
     @Override
@@ -216,18 +234,26 @@ public class ProxyConnection extends PulsarHandler {
             break;
 
         case ProxyConnectionToBroker:
-            // Pass the buffer to the outbound connection and schedule next 
read
-            // only if we can write on the connection
-            ProxyService.OPS_COUNTER.inc();
-            if (msg instanceof ByteBuf) {
-                int bytes = ((ByteBuf) msg).readableBytes();
-                
directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
-                ProxyService.BYTES_COUNTER.inc(bytes);
+            if (directProxyHandler != null) {
+                ProxyService.OPS_COUNTER.inc();
+                if (msg instanceof ByteBuf) {
+                    int bytes = ((ByteBuf) msg).readableBytes();
+                    
directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
+                    ProxyService.BYTES_COUNTER.inc(bytes);
+                }
+                directProxyHandler.outboundChannel.writeAndFlush(msg)
+                        
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            } else {
+                LOG.warn("Received message of type {} while connection to 
broker is missing in state {}. "
+                                + "Dropping the input message (readable 
bytes={}).", msg.getClass(), state,
+                        msg instanceof ByteBuf ? ((ByteBuf) 
msg).readableBytes() : -1);
             }
-            directProxyHandler.outboundChannel.writeAndFlush(msg)
-                    
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
             break;
-
+        case ProxyConnectingToBroker:
+            LOG.warn("Received message of type {} while connecting to broker. "
+                            + "Dropping the input message (readable 
bytes={}).", msg.getClass(),
+                    msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : 
-1);
+            break;
         default:
             break;
         }
@@ -273,14 +299,9 @@ public class ProxyConnection extends PulsarHandler {
                 return;
             }
 
+            state = State.ProxyConnectingToBroker;
             brokerProxyValidator.resolveAndCheckTargetAddress(proxyToBrokerUrl)
-                    .thenAcceptAsync(address -> {
-                        // Client already knows which broker to connect. Let's 
open a
-                        // connection there and just pass bytes in both 
directions
-                        state = State.ProxyConnectionToBroker;
-                        directProxyHandler = new DirectProxyHandler(service, 
this, proxyToBrokerUrl, address,
-                                protocolVersionToAdvertise, 
sslHandlerSupplier);
-                    }, ctx.executor())
+                    .thenAcceptAsync(this::connectToBroker, ctx.executor())
                     .exceptionally(throwable -> {
                         if (throwable instanceof TargetAddressDeniedException
                                 || throwable.getCause() instanceof 
TargetAddressDeniedException) {
@@ -313,6 +334,43 @@ public class ProxyConnection extends PulsarHandler {
         }
     }
 
+    private void handleBrokerConnected(DirectProxyHandler directProxyHandler, 
CommandConnected connected) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called 
in the event loop");
+        if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() 
&& this.directProxyHandler == null) {
+            this.directProxyHandler = directProxyHandler;
+            state = State.ProxyConnectionToBroker;
+            int maxMessageSize =
+                    connected.hasMaxMessageSize() ? 
connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
+            
ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), 
maxMessageSize))
+                    
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        } else {
+            LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+                            + "Closing connection to broker '{}'.",
+                    remoteAddress, ctx.channel().isOpen() ? "open" : "already 
closed",
+                    state != State.ProxyConnectingToBroker ? "invalid state " 
+ state : "state " + state,
+                    proxyToBrokerUrl);
+            directProxyHandler.close();
+            ctx.close();
+        }
+    }
+
+    private void connectToBroker(InetSocketAddress brokerAddress) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called 
in the event loop");
+        DirectProxyHandler directProxyHandler = new 
DirectProxyHandler(service, this);
+        directProxyHandler.connect(proxyToBrokerUrl, brokerAddress,
+                protocolVersionToAdvertise, sslHandlerSupplier);
+    }
+
+    public void brokerConnected(DirectProxyHandler directProxyHandler, 
CommandConnected connected) {
+        try {
+            final CommandConnected finalConnected = new 
CommandConnected().copyFrom(connected);
+            ctx.executor().submit(() -> 
handleBrokerConnected(directProxyHandler, finalConnected));
+        } catch (RejectedExecutionException e) {
+            LOG.error("Event loop was already closed. Closing broker 
connection.", e);
+            directProxyHandler.close();
+        }
+    }
+
     // According to auth result, send newConnected or newAuthChallenge command.
     private void doAuthentication(AuthData clientData) throws Exception {
         AuthData brokerData = authState.authenticate(clientData);

Reply via email to