This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fde5b47e02e628e33d745c4fcaa3301afb34a9c9 Author: Lari Hotari <[email protected]> AuthorDate: Fri Apr 29 14:59:32 2022 +0300 [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress (#15366) * [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress * Add logging when execution is rejected (cherry picked from commit 4ae070c5b26be3bf92c007ca309bd4adbae5ce93) --- .../pulsar/proxy/server/DirectProxyHandler.java | 19 ++-- .../pulsar/proxy/server/ProxyConnection.java | 102 ++++++++++++++++----- 2 files changed, 92 insertions(+), 29 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index 0cbceb191b1..a632f0e7372 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -80,9 +80,7 @@ public class DirectProxyHandler { private final ProxyService service; private final Runnable onHandshakeCompleteAction; - public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String brokerHostAndPort, - InetSocketAddress targetBrokerAddress, int protocolVersion, - Supplier<SslHandler> sslHandlerSupplier) { + public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection) { this.service = service; this.authentication = proxyConnection.getClientAuthentication(); this.inboundChannel = proxyConnection.ctx().channel(); @@ -92,6 +90,10 @@ public class DirectProxyHandler { this.clientAuthData = proxyConnection.clientAuthData; this.clientAuthMethod = proxyConnection.clientAuthMethod; this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask; + } + + public void connect(String brokerHostAndPort, InetSocketAddress targetBrokerAddress, + int protocolVersion, Supplier<SslHandler> sslHandlerSupplier) { ProxyConfiguration config = service.getConfiguration(); // Start the connection attempt. @@ -208,6 +210,12 @@ public class DirectProxyHandler { (byte) 'Y', }; + public void close() { + if (outboundChannel != null) { + outboundChannel.close(); + } + } + enum BackendState { Init, HandshakeCompleted } @@ -344,10 +352,7 @@ public class DirectProxyHandler { onHandshakeCompleteAction.run(); startDirectProxying(connected); - int maxMessageSize = - connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE; - inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize)) - .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + proxyConnection.brokerConnected(DirectProxyHandler.this, connected); } private void startDirectProxying(CommandConnected connected) { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 99f8f04ea84..33dc1abd88a 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -19,6 +19,7 @@ package org.apache.pulsar.proxy.server; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; @@ -26,10 +27,12 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.haproxy.HAProxyMessage; import io.netty.handler.ssl.SslHandler; import io.netty.resolver.dns.DnsNameResolver; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -50,6 +53,7 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.api.proto.CommandAuthResponse; import org.apache.pulsar.common.api.proto.CommandConnect; +import org.apache.pulsar.common.api.proto.CommandConnected; import org.apache.pulsar.common.api.proto.CommandGetSchema; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.api.proto.CommandLookupTopic; @@ -108,6 +112,9 @@ public class ProxyConnection extends PulsarHandler { // Follow redirects ProxyLookupRequests, + // Connecting to the broker + ProxyConnectingToBroker, + // If we are proxying a connection to a specific broker, we // are just forwarding data between the 2 connections, without // looking into it @@ -161,8 +168,8 @@ public class ProxyConnection extends PulsarHandler { public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); - if (directProxyHandler != null && directProxyHandler.outboundChannel != null) { - directProxyHandler.outboundChannel.close(); + if (directProxyHandler != null) { + directProxyHandler.close(); directProxyHandler = null; } @@ -183,11 +190,22 @@ public class ProxyConnection extends PulsarHandler { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - state = State.Closing; super.exceptionCaught(ctx, cause); - LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(), + LOG.warn("[{}] Got exception {} : Message: {} State: {}", remoteAddress, cause.getClass().getSimpleName(), + cause.getMessage(), state, ClientCnx.isKnownException(cause) ? null : cause); - ctx.close(); + if (state != State.Closed) { + state = State.Closing; + } + if (ctx.channel().isOpen()) { + ctx.close(); + } else { + // close connection to broker if that is present + if (directProxyHandler != null) { + directProxyHandler.close(); + directProxyHandler = null; + } + } } @Override @@ -216,18 +234,26 @@ public class ProxyConnection extends PulsarHandler { break; case ProxyConnectionToBroker: - // Pass the buffer to the outbound connection and schedule next read - // only if we can write on the connection - ProxyService.OPS_COUNTER.inc(); - if (msg instanceof ByteBuf) { - int bytes = ((ByteBuf) msg).readableBytes(); - directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes); - ProxyService.BYTES_COUNTER.inc(bytes); + if (directProxyHandler != null) { + ProxyService.OPS_COUNTER.inc(); + if (msg instanceof ByteBuf) { + int bytes = ((ByteBuf) msg).readableBytes(); + directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes); + ProxyService.BYTES_COUNTER.inc(bytes); + } + directProxyHandler.outboundChannel.writeAndFlush(msg) + .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + } else { + LOG.warn("Received message of type {} while connection to broker is missing in state {}. " + + "Dropping the input message (readable bytes={}).", msg.getClass(), state, + msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1); } - directProxyHandler.outboundChannel.writeAndFlush(msg) - .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); break; - + case ProxyConnectingToBroker: + LOG.warn("Received message of type {} while connecting to broker. " + + "Dropping the input message (readable bytes={}).", msg.getClass(), + msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1); + break; default: break; } @@ -273,14 +299,9 @@ public class ProxyConnection extends PulsarHandler { return; } + state = State.ProxyConnectingToBroker; brokerProxyValidator.resolveAndCheckTargetAddress(proxyToBrokerUrl) - .thenAcceptAsync(address -> { - // Client already knows which broker to connect. Let's open a - // connection there and just pass bytes in both directions - state = State.ProxyConnectionToBroker; - directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl, address, - protocolVersionToAdvertise, sslHandlerSupplier); - }, ctx.executor()) + .thenAcceptAsync(this::connectToBroker, ctx.executor()) .exceptionally(throwable -> { if (throwable instanceof TargetAddressDeniedException || throwable.getCause() instanceof TargetAddressDeniedException) { @@ -313,6 +334,43 @@ public class ProxyConnection extends PulsarHandler { } } + private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) { + checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop"); + if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() && this.directProxyHandler == null) { + this.directProxyHandler = directProxyHandler; + state = State.ProxyConnectionToBroker; + int maxMessageSize = + connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE; + ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize)) + .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + } else { + LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. " + + "Closing connection to broker '{}'.", + remoteAddress, ctx.channel().isOpen() ? "open" : "already closed", + state != State.ProxyConnectingToBroker ? "invalid state " + state : "state " + state, + proxyToBrokerUrl); + directProxyHandler.close(); + ctx.close(); + } + } + + private void connectToBroker(InetSocketAddress brokerAddress) { + checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop"); + DirectProxyHandler directProxyHandler = new DirectProxyHandler(service, this); + directProxyHandler.connect(proxyToBrokerUrl, brokerAddress, + protocolVersionToAdvertise, sslHandlerSupplier); + } + + public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) { + try { + final CommandConnected finalConnected = new CommandConnected().copyFrom(connected); + ctx.executor().submit(() -> handleBrokerConnected(directProxyHandler, finalConnected)); + } catch (RejectedExecutionException e) { + LOG.error("Event loop was already closed. Closing broker connection.", e); + directProxyHandler.close(); + } + } + // According to auth result, send newConnected or newAuthChallenge command. private void doAuthentication(AuthData clientData) throws Exception { AuthData brokerData = authState.authenticate(clientData);
