This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1ddf497ff504200d1dec8d188953e62332353741 Author: Lari Hotari <[email protected]> AuthorDate: Sat Nov 15 10:11:13 2025 +0200 [fix] Handle TLS close_notify to avoid SslClosedEngineException: SSLEngine closed already (#24986) (cherry picked from commit ee3e5eaa9f9364287061a70dbb41f9bc9c2adb0c) --- .../org/apache/pulsar/client/impl/ClientCnx.java | 13 ------------ .../pulsar/common/protocol/PulsarDecoder.java | 24 ++++++++++++++++++++++ 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index e3abee3f6ec..17760b4194b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -29,7 +29,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; import io.netty.channel.unix.Errors.NativeIoException; -import io.netty.handler.ssl.SslHandshakeCompletionEvent; import io.netty.util.concurrent.Promise; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -1378,18 +1377,6 @@ public class ClientCnx extends PulsarHandler { } } - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof SslHandshakeCompletionEvent) { - SslHandshakeCompletionEvent sslHandshakeCompletionEvent = (SslHandshakeCompletionEvent) evt; - if (sslHandshakeCompletionEvent.cause() != null) { - log.warn("{} Got ssl handshake exception {}", ctx.channel(), - sslHandshakeCompletionEvent); - } - } - ctx.fireUserEventTriggered(evt); - } - protected void closeWithException(Throwable e) { if (ctx != null) { connectionFuture.completeExceptionally(e); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index ed346dd0e0b..cbae071a876 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -24,6 +24,8 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelOutboundInvoker; import io.netty.handler.codec.haproxy.HAProxyMessage; +import io.netty.handler.ssl.SslCloseCompletionEvent; +import io.netty.handler.ssl.SslHandshakeCompletionEvent; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandAckResponse; @@ -748,4 +750,26 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter { private void writeAndFlush(ChannelOutboundInvoker ctx, ByteBuf cmd) { NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd); } + + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + if (evt instanceof SslHandshakeCompletionEvent) { + // log handshake failures + SslHandshakeCompletionEvent sslHandshakeCompletionEvent = (SslHandshakeCompletionEvent) evt; + if (!sslHandshakeCompletionEvent.isSuccess()) { + log.warn("[{}] TLS handshake failed. {}", ctx.channel(), sslHandshakeCompletionEvent); + } + } else if (evt instanceof SslCloseCompletionEvent) { + // handle TLS close_notify event and immediately close the channel + // this is not handled by Netty by default + // See https://datatracker.ietf.org/doc/html/rfc8446#section-6.1 for more details + SslCloseCompletionEvent sslCloseCompletionEvent = (SslCloseCompletionEvent) evt; + if (sslCloseCompletionEvent.isSuccess() && ctx.channel().isActive()) { + if (log.isDebugEnabled()) { + log.debug("[{}] Received a TLS close_notify, closing the channel.", ctx.channel()); + } + ctx.close(); + } + } + ctx.fireUserEventTriggered(evt); + } }
