This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9efee01ef28ca5e20c3f82b8a9661c3b347cfec1
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Thu Aug 14 17:11:45 2025 +0300

    [fix][proxy] Fix TooLongFrameException with Pulsar Proxy (#24626)
    
    (cherry picked from commit cde4948044767033e40bc7d4eff98a4c25fd3aff)
---
 .../broker/service/PulsarChannelInitializer.java   |  6 +-
 .../pulsar/client/api/MockBrokerService.java       |  4 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  5 +-
 .../client/impl/PulsarChannelInitializer.java      |  8 +--
 .../pulsar/common/protocol/FrameDecoderUtil.java   | 67 ++++++++++++++++++++++
 .../pulsar/proxy/server/DirectProxyHandler.java    | 25 +++-----
 .../proxy/server/ServiceChannelInitializer.java    |  7 +--
 7 files changed, 86 insertions(+), 36 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index f15f6d67766..7a8d05b906a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.flow.FlowControlHandler;
 import io.netty.handler.flush.FlushConsolidationHandler;
 import io.netty.handler.ssl.SslContext;
@@ -34,7 +33,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.protocol.ByteBufPair;
-import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.FrameDecoderUtil;
 import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder;
 import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
 import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
@@ -122,8 +121,7 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
         if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) {
             ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new 
OptionalProxyProtocolDecoder());
         }
-        ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
-            brokerConf.getMaxMessageSize() + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+        FrameDecoderUtil.addFrameDecoder(ch.pipeline(), 
brokerConf.getMaxMessageSize());
         // 
https://stackoverflow.com/questions/37535482/netty-disabling-auto-read-doesnt-work-for-bytetomessagedecoder
         // Classes such as {@link ByteToMessageDecoder} or {@link 
MessageToByteEncoder} are free to emit as many events
         // as they like for any given input. so, disabling auto-read on 
`ByteToMessageDecoder` doesn't work properly and
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
index 9ca0bafe00b..cb55e9c48b9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
@@ -28,7 +28,6 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -69,6 +68,7 @@ import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.FrameDecoderUtil;
 import org.apache.pulsar.common.protocol.PulsarDecoder;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -338,7 +338,7 @@ public class MockBrokerService {
             bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
-                    ch.pipeline().addLast("frameDecoder", new 
LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4));
+                    FrameDecoderUtil.addFrameDecoder(ch.pipeline(), 
MaxMessageSize);
                     ch.pipeline().addLast("handler", new MockServerCnx());
                 }
             });
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index ffba7fab31d..effd5a85e64 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -29,7 +29,6 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.unix.Errors.NativeIoException;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.ssl.SslHandshakeCompletionEvent;
 import io.netty.util.concurrent.Promise;
 import io.opentelemetry.api.common.Attributes;
@@ -102,6 +101,7 @@ import 
org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.lookup.GetTopicsResult;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.FrameDecoderUtil;
 import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -397,8 +397,7 @@ public class ClientCnx extends PulsarHandler {
                           + "server frame size {}", ctx.channel(), 
connected.getMaxMessageSize());
             }
             maxMessageSize = connected.getMaxMessageSize();
-            ctx.pipeline().replace("frameDecoder", "newFrameDecoder", new 
LengthFieldBasedFrameDecoder(
-                connected.getMaxMessageSize() + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+            FrameDecoderUtil.replaceFrameDecoder(ctx.pipeline(), 
connected.getMaxMessageSize());
         }
         if (log.isDebugEnabled()) {
             log.debug("{} Connection is ready", ctx.channel());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index dff423d19fb..ca79caa4ca0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -22,7 +22,6 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.flush.FlushConsolidationHandler;
 import io.netty.handler.proxy.Socks5ProxyHandler;
 import io.netty.handler.ssl.SslContext;
@@ -42,6 +41,7 @@ import 
org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.util.ObjectCache;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.FrameDecoderUtil;
 import org.apache.pulsar.common.util.SecurityUtility;
 import 
org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
 import org.apache.pulsar.common.util.netty.NettyFutureUtil;
@@ -149,14 +149,12 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
 
         // Setup channel except for the SsHandler for TLS enabled connections
         ch.pipeline().addLast("ByteBufPairEncoder", 
ByteBufPair.getEncoder(tlsEnabled));
-
-        ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
-                Commands.DEFAULT_MAX_MESSAGE_SIZE + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+        FrameDecoderUtil.addFrameDecoder(ch.pipeline(), 
Commands.DEFAULT_MAX_MESSAGE_SIZE);
         ChannelHandler clientCnx = clientCnxSupplier.get();
         ch.pipeline().addLast("handler", clientCnx);
     }
 
-   /**
+    /**
      * Initialize TLS for a channel. Should be invoked before the channel is 
connected to the remote address.
      *
      * @param ch      the channel
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/FrameDecoderUtil.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/FrameDecoderUtil.java
new file mode 100644
index 00000000000..aee9f2c39bd
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/FrameDecoderUtil.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol;
+
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import lombok.experimental.UtilityClass;
+
+/**
+ * Utility class for managing Netty LenghtFieldBasedFrameDecoder instances in 
a Netty ChannelPipeline
+ * for the Pulsar binary protocol.
+ */
+@UtilityClass
+public class FrameDecoderUtil {
+    public static final String FRAME_DECODER_HANDLER = "frameDecoder";
+
+    /**
+     * Adds a LengthFieldBasedFrameDecoder to the given ChannelPipeline.
+     *
+     * @param pipeline the ChannelPipeline to which the decoder will be added
+     * @param maxMessageSize the maximum size of messages that can be decoded
+     */
+    public static void addFrameDecoder(ChannelPipeline pipeline, int 
maxMessageSize) {
+        pipeline.addLast(FRAME_DECODER_HANDLER, 
createFrameDecoder(maxMessageSize));
+    }
+
+    /**
+     * Replaces the existing LengthFieldBasedFrameDecoder in the given 
ChannelPipeline with a new one.
+     *
+     * @param pipeline the ChannelPipeline in which the decoder will be 
replaced
+     * @param maxMessageSize the maximum size of messages that can be decoded
+     */
+    public static void replaceFrameDecoder(ChannelPipeline pipeline, int 
maxMessageSize) {
+        pipeline.replace(FRAME_DECODER_HANDLER, FRAME_DECODER_HANDLER, 
createFrameDecoder(maxMessageSize));
+    }
+
+    /**
+     * Removes the LengthFieldBasedFrameDecoder from the given ChannelPipeline.
+     * This is useful in the Pulsar Proxy to remove the decoder before direct 
proxying of messages without decoding.
+     *
+     * @param pipeline the ChannelPipeline from which the decoder will be 
removed
+     */
+    public static void removeFrameDecoder(ChannelPipeline pipeline) {
+        pipeline.remove(FRAME_DECODER_HANDLER);
+    }
+
+    private static LengthFieldBasedFrameDecoder createFrameDecoder(int 
maxMessageSize) {
+        return new LengthFieldBasedFrameDecoder(
+                maxMessageSize + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 
4);
+    }
+}
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 7443c5e67fb..b545bf9b0ca 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -34,7 +34,6 @@ import io.netty.channel.epoll.EpollChannelOption;
 import io.netty.channel.epoll.EpollMode;
 import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.codec.haproxy.HAProxyCommand;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
@@ -59,6 +58,7 @@ import 
org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 import org.apache.pulsar.common.api.proto.CommandConnected;
 import org.apache.pulsar.common.api.proto.FeatureFlags;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.FrameDecoderUtil;
 import org.apache.pulsar.common.protocol.PulsarDecoder;
 import org.apache.pulsar.common.stats.Rate;
 import org.apache.pulsar.common.util.NettyClientSslContextRefresher;
@@ -208,9 +208,7 @@ public class DirectProxyHandler {
                     ch.pipeline().addLast("readTimeoutHandler",
                             new ReadTimeoutHandler(brokerProxyReadTimeoutMs, 
TimeUnit.MILLISECONDS));
                 }
-                ch.pipeline().addLast("frameDecoder", new 
LengthFieldBasedFrameDecoder(
-                        service.getConfiguration().getMaxMessageSize() + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0,
-                        4));
+                FrameDecoderUtil.addFrameDecoder(ch.pipeline(), 
service.getConfiguration().getMaxMessageSize());
                 ch.pipeline().addLast("proxyOutboundHandler",
                         (ChannelHandler) new ProxyBackendHandler(config, 
protocolVersion, remoteHost, featureFlags));
             }
@@ -450,23 +448,16 @@ public class DirectProxyHandler {
                     log.debug("[{}] [{}] Removing decoder from pipeline", 
inboundChannel, outboundChannel);
                 }
                 // direct tcp proxy
-                inboundChannel.pipeline().remove("frameDecoder");
-                outboundChannel.pipeline().remove("frameDecoder");
+                FrameDecoderUtil.removeFrameDecoder(inboundChannel.pipeline());
+                
FrameDecoderUtil.removeFrameDecoder(outboundChannel.pipeline());
             } else {
                 // Enable parsing feature, proxyLogLevel(1 or 2)
                 // Add parser handler
                 if (connected.hasMaxMessageSize()) {
-                    inboundChannel.pipeline()
-                            .replace("frameDecoder", "newFrameDecoder",
-                                    new 
LengthFieldBasedFrameDecoder(connected.getMaxMessageSize()
-                                            + 
Commands.MESSAGE_SIZE_FRAME_PADDING,
-                                            0, 4, 0, 4));
-                    outboundChannel.pipeline().replace("frameDecoder", 
"newFrameDecoder",
-                            new LengthFieldBasedFrameDecoder(
-                                    connected.getMaxMessageSize()
-                                            + 
Commands.MESSAGE_SIZE_FRAME_PADDING,
-                                    0, 4, 0, 4));
-
+                    
FrameDecoderUtil.replaceFrameDecoder(inboundChannel.pipeline(),
+                            connected.getMaxMessageSize());
+                    
FrameDecoderUtil.replaceFrameDecoder(outboundChannel.pipeline(),
+                            connected.getMaxMessageSize());
                     inboundChannel.pipeline().addBefore("handler", 
"inboundParser",
                             new ParserProxyHandler(service,
                                     ParserProxyHandler.FRONTEND_CONN,
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 19f4002ad52..acae088f1c7 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -20,14 +20,13 @@ package org.apache.pulsar.proxy.server;
 
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.flush.FlushConsolidationHandler;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.ssl.SslProvider;
 import io.netty.handler.timeout.ReadTimeoutHandler;
 import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.FrameDecoderUtil;
 import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder;
 import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
 import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
@@ -112,9 +111,7 @@ public class ServiceChannelInitializer extends 
ChannelInitializer<SocketChannel>
         if (proxyService.getConfiguration().isHaProxyProtocolEnabled()) {
             ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new 
OptionalProxyProtocolDecoder());
         }
-        ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
-                this.maxMessageSize + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 
4, 0, 4));
-
+        FrameDecoderUtil.addFrameDecoder(ch.pipeline(), maxMessageSize);
         ch.pipeline().addLast("handler", new ProxyConnection(proxyService, 
proxyService.getDnsAddressResolverGroup()));
     }
 }

Reply via email to