This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8115076c5111f7bcddb010639d787c38cf0b8f82 Author: Lari Hotari <[email protected]> AuthorDate: Thu Apr 28 19:43:13 2022 +0300 [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress (#15366) - backports https://github.com/apache/pulsar/pull/15366 to branch-2.7 (cherry picked from commit 4621ca63fcaabf3a0faefd487434dbd97c1d8859) --- .../pulsar/proxy/server/DirectProxyHandler.java | 19 ++-- .../pulsar/proxy/server/ProxyConnection.java | 103 ++++++++++++++++----- 2 files changed, 92 insertions(+), 30 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index 8fa7787215d..64ce8c68b27 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -80,9 +80,7 @@ public class DirectProxyHandler { private final ProxyService service; private final Runnable onHandshakeCompleteAction; - public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String brokerHostAndPort, - InetSocketAddress targetBrokerAddress, int protocolVersion, - Supplier<SslHandler> sslHandlerSupplier) { + public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection) { this.service = service; this.authentication = proxyConnection.getClientAuthentication(); this.inboundChannel = proxyConnection.ctx().channel(); @@ -92,6 +90,10 @@ public class DirectProxyHandler { this.clientAuthData = proxyConnection.clientAuthData; this.clientAuthMethod = proxyConnection.clientAuthMethod; this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask; + } + + public void connect(String brokerHostAndPort, InetSocketAddress targetBrokerAddress, + int protocolVersion, Supplier<SslHandler> sslHandlerSupplier) { ProxyConfiguration config = service.getConfiguration(); // Start the connection attempt. @@ -208,6 +210,12 @@ public class DirectProxyHandler { (byte) 'Y', }; + public void close() { + if (outboundChannel != null) { + outboundChannel.close(); + } + } + enum BackendState { Init, HandshakeCompleted } @@ -344,10 +352,7 @@ public class DirectProxyHandler { onHandshakeCompleteAction.run(); startDirectProxying(connected); - int maxMessageSize = - connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE; - inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize)) - .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + proxyConnection.brokerConnected(DirectProxyHandler.this, connected); } private void startDirectProxying(CommandConnected connected) { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 395d16bbbd8..fbf604866b8 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -19,14 +19,16 @@ package org.apache.pulsar.proxy.server; import static com.google.common.base.Preconditions.checkArgument; - +import static com.google.common.base.Preconditions.checkState; import io.netty.channel.ChannelFutureListener; import io.netty.handler.codec.haproxy.HAProxyMessage; import io.netty.resolver.dns.DnsNameResolver; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -51,6 +53,7 @@ import org.apache.pulsar.common.protocol.PulsarHandler; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema; @@ -113,6 +116,9 @@ public class ProxyConnection extends PulsarHandler { // Follow redirects ProxyLookupRequests, + // Connecting to the broker + ProxyConnectingToBroker, + // If we are proxying a connection to a specific broker, we // are just forwarding data between the 2 connections, without // looking into it @@ -166,8 +172,8 @@ public class ProxyConnection extends PulsarHandler { public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); - if (directProxyHandler != null && directProxyHandler.outboundChannel != null) { - directProxyHandler.outboundChannel.close(); + if (directProxyHandler != null) { + directProxyHandler.close(); directProxyHandler = null; } @@ -188,11 +194,22 @@ public class ProxyConnection extends PulsarHandler { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - state = State.Closing; super.exceptionCaught(ctx, cause); - LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(), + LOG.warn("[{}] Got exception {} : Message: {} State: {}", remoteAddress, cause.getClass().getSimpleName(), + cause.getMessage(), state, ClientCnx.isKnownException(cause) ? null : cause); - ctx.close(); + if (state != State.Closed) { + state = State.Closing; + } + if (ctx.channel().isOpen()) { + ctx.close(); + } else { + // close connection to broker if that is present + if (directProxyHandler != null) { + directProxyHandler.close(); + directProxyHandler = null; + } + } } @Override @@ -221,18 +238,26 @@ public class ProxyConnection extends PulsarHandler { break; case ProxyConnectionToBroker: - // Pass the buffer to the outbound connection and schedule next read - // only if we can write on the connection - ProxyService.opsCounter.inc(); - if (msg instanceof ByteBuf) { - int bytes = ((ByteBuf) msg).readableBytes(); - directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes); - ProxyService.bytesCounter.inc(bytes); + if (directProxyHandler != null) { + ProxyService.opsCounter.inc(); + if (msg instanceof ByteBuf) { + int bytes = ((ByteBuf) msg).readableBytes(); + directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes); + ProxyService.bytesCounter.inc(bytes); + } + directProxyHandler.outboundChannel.writeAndFlush(msg) + .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + } else { + LOG.warn("Received message of type {} while connection to broker is missing in state {}. " + + "Dropping the input message (readable bytes={}).", msg.getClass(), state, + msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1); } - directProxyHandler.outboundChannel.writeAndFlush(msg) - .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); break; - + case ProxyConnectingToBroker: + LOG.warn("Received message of type {} while connecting to broker. " + + "Dropping the input message (readable bytes={}).", msg.getClass(), + msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1); + break; default: break; } @@ -278,14 +303,9 @@ public class ProxyConnection extends PulsarHandler { return; } + state = State.ProxyConnectingToBroker; brokerProxyValidator.resolveAndCheckTargetAddress(proxyToBrokerUrl) - .thenAcceptAsync(address -> { - // Client already knows which broker to connect. Let's open a - // connection there and just pass bytes in both directions - state = State.ProxyConnectionToBroker; - directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl, address, - protocolVersionToAdvertise, sslHandlerSupplier); - }, ctx.executor()) + .thenAcceptAsync(this::connectToBroker, ctx.executor()) .exceptionally(throwable -> { if (throwable instanceof TargetAddressDeniedException || throwable.getCause() instanceof TargetAddressDeniedException) { @@ -318,6 +338,43 @@ public class ProxyConnection extends PulsarHandler { } } + private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) { + checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop"); + if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() && this.directProxyHandler == null) { + this.directProxyHandler = directProxyHandler; + state = State.ProxyConnectionToBroker; + int maxMessageSize = + connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE; + ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize)) + .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + } else { + LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. " + + "Closing connection to broker '{}'.", + remoteAddress, ctx.channel().isOpen() ? "open" : "already closed", + state != State.ProxyConnectingToBroker ? "invalid state " + state : "state " + state, + proxyToBrokerUrl); + directProxyHandler.close(); + ctx.close(); + } + } + + private void connectToBroker(InetSocketAddress brokerAddress) { + checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop"); + DirectProxyHandler directProxyHandler = new DirectProxyHandler(service, this); + directProxyHandler.connect(proxyToBrokerUrl, brokerAddress, + protocolVersionToAdvertise, sslHandlerSupplier); + } + + public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) { + try { + final CommandConnected finalConnected = connected.toBuilder().build(); + ctx.executor().submit(() -> handleBrokerConnected(directProxyHandler, finalConnected)); + } catch (RejectedExecutionException e) { + LOG.error("Event loop was already closed. Closing broker connection.", e); + directProxyHandler.close(); + } + } + // According to auth result, send newConnected or newAuthChallenge command. private void doAuthentication(AuthData clientData) throws Exception { AuthData brokerData = authState.authenticate(clientData);
