This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9efee01ef28ca5e20c3f82b8a9661c3b347cfec1 Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Thu Aug 14 17:11:45 2025 +0300 [fix][proxy] Fix TooLongFrameException with Pulsar Proxy (#24626) (cherry picked from commit cde4948044767033e40bc7d4eff98a4c25fd3aff) --- .../broker/service/PulsarChannelInitializer.java | 6 +- .../pulsar/client/api/MockBrokerService.java | 4 +- .../org/apache/pulsar/client/impl/ClientCnx.java | 5 +- .../client/impl/PulsarChannelInitializer.java | 8 +-- .../pulsar/common/protocol/FrameDecoderUtil.java | 67 ++++++++++++++++++++++ .../pulsar/proxy/server/DirectProxyHandler.java | 25 +++----- .../proxy/server/ServiceChannelInitializer.java | 7 +-- 7 files changed, 86 insertions(+), 36 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index f15f6d67766..7a8d05b906a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.flow.FlowControlHandler; import io.netty.handler.flush.FlushConsolidationHandler; import io.netty.handler.ssl.SslContext; @@ -34,7 +33,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.protocol.ByteBufPair; -import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.FrameDecoderUtil; import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder; import org.apache.pulsar.common.util.NettyServerSslContextBuilder; import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder; @@ -122,8 +121,7 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) { ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder()); } - ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( - brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); + FrameDecoderUtil.addFrameDecoder(ch.pipeline(), brokerConf.getMaxMessageSize()); // https://stackoverflow.com/questions/37535482/netty-disabling-auto-read-doesnt-work-for-bytetomessagedecoder // Classes such as {@link ByteToMessageDecoder} or {@link MessageToByteEncoder} are free to emit as many events // as they like for any given input. so, disabling auto-read on `ByteToMessageDecoder` doesn't work properly and diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java index 9ca0bafe00b..cb55e9c48b9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java @@ -28,7 +28,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import java.io.IOException; import java.net.InetSocketAddress; @@ -69,6 +68,7 @@ import org.apache.pulsar.common.api.proto.CommandUnsubscribe; import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.FrameDecoderUtil; import org.apache.pulsar.common.protocol.PulsarDecoder; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.util.netty.EventLoopUtil; @@ -338,7 +338,7 @@ public class MockBrokerService { bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4)); + FrameDecoderUtil.addFrameDecoder(ch.pipeline(), MaxMessageSize); ch.pipeline().addLast("handler", new MockServerCnx()); } }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index ffba7fab31d..effd5a85e64 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -29,7 +29,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; import io.netty.channel.unix.Errors.NativeIoException; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.ssl.SslHandshakeCompletionEvent; import io.netty.util.concurrent.Promise; import io.opentelemetry.api.common.Attributes; @@ -102,6 +101,7 @@ import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.lookup.GetTopicsResult; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.FrameDecoderUtil; import org.apache.pulsar.common.protocol.PulsarHandler; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; @@ -397,8 +397,7 @@ public class ClientCnx extends PulsarHandler { + "server frame size {}", ctx.channel(), connected.getMaxMessageSize()); } maxMessageSize = connected.getMaxMessageSize(); - ctx.pipeline().replace("frameDecoder", "newFrameDecoder", new LengthFieldBasedFrameDecoder( - connected.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); + FrameDecoderUtil.replaceFrameDecoder(ctx.pipeline(), connected.getMaxMessageSize()); } if (log.isDebugEnabled()) { log.debug("{} Connection is ready", ctx.channel()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java index dff423d19fb..ca79caa4ca0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java @@ -22,7 +22,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.flush.FlushConsolidationHandler; import io.netty.handler.proxy.Socks5ProxyHandler; import io.netty.handler.ssl.SslContext; @@ -42,6 +41,7 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.util.ObjectCache; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.FrameDecoderUtil; import org.apache.pulsar.common.util.SecurityUtility; import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder; import org.apache.pulsar.common.util.netty.NettyFutureUtil; @@ -149,14 +149,12 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> // Setup channel except for the SsHandler for TLS enabled connections ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.getEncoder(tlsEnabled)); - - ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( - Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); + FrameDecoderUtil.addFrameDecoder(ch.pipeline(), Commands.DEFAULT_MAX_MESSAGE_SIZE); ChannelHandler clientCnx = clientCnxSupplier.get(); ch.pipeline().addLast("handler", clientCnx); } - /** + /** * Initialize TLS for a channel. Should be invoked before the channel is connected to the remote address. * * @param ch the channel diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/FrameDecoderUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/FrameDecoderUtil.java new file mode 100644 index 00000000000..aee9f2c39bd --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/FrameDecoderUtil.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.protocol; + +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import lombok.experimental.UtilityClass; + +/** + * Utility class for managing Netty LenghtFieldBasedFrameDecoder instances in a Netty ChannelPipeline + * for the Pulsar binary protocol. + */ +@UtilityClass +public class FrameDecoderUtil { + public static final String FRAME_DECODER_HANDLER = "frameDecoder"; + + /** + * Adds a LengthFieldBasedFrameDecoder to the given ChannelPipeline. + * + * @param pipeline the ChannelPipeline to which the decoder will be added + * @param maxMessageSize the maximum size of messages that can be decoded + */ + public static void addFrameDecoder(ChannelPipeline pipeline, int maxMessageSize) { + pipeline.addLast(FRAME_DECODER_HANDLER, createFrameDecoder(maxMessageSize)); + } + + /** + * Replaces the existing LengthFieldBasedFrameDecoder in the given ChannelPipeline with a new one. + * + * @param pipeline the ChannelPipeline in which the decoder will be replaced + * @param maxMessageSize the maximum size of messages that can be decoded + */ + public static void replaceFrameDecoder(ChannelPipeline pipeline, int maxMessageSize) { + pipeline.replace(FRAME_DECODER_HANDLER, FRAME_DECODER_HANDLER, createFrameDecoder(maxMessageSize)); + } + + /** + * Removes the LengthFieldBasedFrameDecoder from the given ChannelPipeline. + * This is useful in the Pulsar Proxy to remove the decoder before direct proxying of messages without decoding. + * + * @param pipeline the ChannelPipeline from which the decoder will be removed + */ + public static void removeFrameDecoder(ChannelPipeline pipeline) { + pipeline.remove(FRAME_DECODER_HANDLER); + } + + private static LengthFieldBasedFrameDecoder createFrameDecoder(int maxMessageSize) { + return new LengthFieldBasedFrameDecoder( + maxMessageSize + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4); + } +} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index 7443c5e67fb..b545bf9b0ca 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -34,7 +34,6 @@ import io.netty.channel.epoll.EpollChannelOption; import io.netty.channel.epoll.EpollMode; import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.haproxy.HAProxyCommand; import io.netty.handler.codec.haproxy.HAProxyMessage; import io.netty.handler.codec.haproxy.HAProxyProtocolVersion; @@ -59,6 +58,7 @@ import org.apache.pulsar.common.api.proto.CommandAuthChallenge; import org.apache.pulsar.common.api.proto.CommandConnected; import org.apache.pulsar.common.api.proto.FeatureFlags; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.FrameDecoderUtil; import org.apache.pulsar.common.protocol.PulsarDecoder; import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.NettyClientSslContextRefresher; @@ -208,9 +208,7 @@ public class DirectProxyHandler { ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS)); } - ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( - service.getConfiguration().getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, - 4)); + FrameDecoderUtil.addFrameDecoder(ch.pipeline(), service.getConfiguration().getMaxMessageSize()); ch.pipeline().addLast("proxyOutboundHandler", (ChannelHandler) new ProxyBackendHandler(config, protocolVersion, remoteHost, featureFlags)); } @@ -450,23 +448,16 @@ public class DirectProxyHandler { log.debug("[{}] [{}] Removing decoder from pipeline", inboundChannel, outboundChannel); } // direct tcp proxy - inboundChannel.pipeline().remove("frameDecoder"); - outboundChannel.pipeline().remove("frameDecoder"); + FrameDecoderUtil.removeFrameDecoder(inboundChannel.pipeline()); + FrameDecoderUtil.removeFrameDecoder(outboundChannel.pipeline()); } else { // Enable parsing feature, proxyLogLevel(1 or 2) // Add parser handler if (connected.hasMaxMessageSize()) { - inboundChannel.pipeline() - .replace("frameDecoder", "newFrameDecoder", - new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize() - + Commands.MESSAGE_SIZE_FRAME_PADDING, - 0, 4, 0, 4)); - outboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder", - new LengthFieldBasedFrameDecoder( - connected.getMaxMessageSize() - + Commands.MESSAGE_SIZE_FRAME_PADDING, - 0, 4, 0, 4)); - + FrameDecoderUtil.replaceFrameDecoder(inboundChannel.pipeline(), + connected.getMaxMessageSize()); + FrameDecoderUtil.replaceFrameDecoder(outboundChannel.pipeline(), + connected.getMaxMessageSize()); inboundChannel.pipeline().addBefore("handler", "inboundParser", new ParserProxyHandler(service, ParserProxyHandler.FRONTEND_CONN, diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java index 19f4002ad52..acae088f1c7 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java @@ -20,14 +20,13 @@ package org.apache.pulsar.proxy.server; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.flush.FlushConsolidationHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslProvider; import io.netty.handler.timeout.ReadTimeoutHandler; import java.util.concurrent.TimeUnit; -import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.FrameDecoderUtil; import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder; import org.apache.pulsar.common.util.NettyServerSslContextBuilder; import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder; @@ -112,9 +111,7 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel> if (proxyService.getConfiguration().isHaProxyProtocolEnabled()) { ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder()); } - ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( - this.maxMessageSize + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); - + FrameDecoderUtil.addFrameDecoder(ch.pipeline(), maxMessageSize); ch.pipeline().addLast("handler", new ProxyConnection(proxyService, proxyService.getDnsAddressResolverGroup())); } }