This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 94890cb00a6 [fix][broker] Restore per-message auto-read flow control 
broken by Netty 4.2.15 (#26013)
94890cb00a6 is described below

commit 94890cb00a6b79fa609eabba3369301fbf748ade
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Jun 12 20:20:53 2026 +0300

    [fix][broker] Restore per-message auto-read flow control broken by Netty 
4.2.15 (#26013)
---
 .../broker/service/PulsarChannelInitializer.java   |   7 +-
 .../util/netty/PulsarFlowControlHandler.java       | 245 +++++++++++++++++++++
 .../util/netty/PulsarFlowControlHandlerTest.java   | 115 ++++++++++
 3 files changed, 365 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index 37b06e30054..579e253f880 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.flow.FlowControlHandler;
 import io.netty.handler.flush.FlushConsolidationHandler;
 import io.netty.handler.ssl.SslHandler;
 import java.util.concurrent.TimeUnit;
@@ -36,6 +35,7 @@ import org.apache.pulsar.common.protocol.FrameDecoderUtil;
 import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder;
 import org.apache.pulsar.common.util.PulsarSslConfiguration;
 import org.apache.pulsar.common.util.PulsarSslFactory;
+import org.apache.pulsar.common.util.netty.PulsarFlowControlHandler;
 
 @CustomLog
 public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel> {
@@ -98,7 +98,10 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
         // as they like for any given input. so, disabling auto-read on 
`ByteToMessageDecoder` doesn't work properly and
         // ServerCnx ends up reading higher number of messages and broker can 
not throttle the messages by disabling
         // auto-read.
-        ch.pipeline().addLast("flowController", new FlowControlHandler());
+        // PulsarFlowControlHandler is used instead of Netty's 
FlowControlHandler since Netty 4.2.15 changed the
+        // behavior to ignore setAutoRead(false) made by a downstream handler 
while queued messages are being
+        // delivered, which breaks throttling of already buffered messages. 
See PulsarFlowControlHandler javadoc.
+        ch.pipeline().addLast("flowController", new 
PulsarFlowControlHandler());
         // using "ChannelHandler" type to workaround an IntelliJ bug that 
shows a false positive error
         ChannelHandler cnx = newServerCnx(pulsar, listenerName);
         ch.pipeline().addLast("handler", cnx);
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/PulsarFlowControlHandler.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/PulsarFlowControlHandler.java
new file mode 100644
index 00000000000..22d9434e53e
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/PulsarFlowControlHandler.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.netty;
+
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.util.Recycler;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.internal.ObjectPool.Handle;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+/**
+ * The {@link PulsarFlowControlHandler} ensures that only one message per 
{@code read()} is sent downstream.
+ *
+ * <p>Classes such as {@link ByteToMessageDecoder} are free to emit as many 
events as they like for any given input.
+ * A channel's auto reading configuration doesn't usually apply in these 
scenarios. This is causing problems in
+ * downstream {@link ChannelHandler}s that would like to hold subsequent 
events while they're processing one event.
+ *
+ * <p>This class is derived from {@code 
io.netty.handler.flow.FlowControlHandler} of Netty 4.2.14.Final
+ * (Copyright 2016 The Netty Project, licensed under the Apache License, 
version 2.0), which is behaviorally
+ * identical to the implementation in Netty 4.1.135.Final. Up to Netty
+ * 4.1.135/4.2.14, the dequeue loop of Netty's handler re-checked {@link 
ChannelConfig#isAutoRead()} before
+ * releasing each queued message, so a downstream handler calling {@code 
setAutoRead(false)} from inside
+ * {@code channelRead} stopped the delivery of queued messages immediately. 
Netty 4.2.15 (netty/netty#16837,
+ * backport of netty/netty#15053; the equivalent 4.1 change is 
netty/netty#16912) rewrote the handler to decide
+ * once up front: with auto-read enabled it dequeues the entire queue and 
ignores auto-read changes made mid-drain
+ * by downstream handlers. Pulsar's reactive request throttling (see {@code 
ServerCnxThrottleTracker} in the broker)
+ * pauses a connection by disabling auto-read while processing a message and 
relies on the delivery of queued
+ * messages stopping immediately; with Netty's rewritten handler, the whole 
queued backlog would be delivered
+ * regardless, defeating throttling (and rate limiting) of already-buffered 
traffic. This copy preserves the
+ * pre-4.2.15 per-message auto-read behavior.
+ *
+ * @see ChannelConfig#setAutoRead(boolean)
+ */
+public class PulsarFlowControlHandler extends ChannelDuplexHandler {
+    private static final InternalLogger logger = 
InternalLoggerFactory.getInstance(PulsarFlowControlHandler.class);
+
+    private final boolean releaseMessages;
+
+    private RecyclableArrayDeque queue;
+
+    private ChannelConfig config;
+
+    private boolean shouldConsume;
+
+    public PulsarFlowControlHandler() {
+        this(true);
+    }
+
+    public PulsarFlowControlHandler(boolean releaseMessages) {
+        this.releaseMessages = releaseMessages;
+    }
+
+    /**
+     * Determine if the underlying {@link Queue} is empty. This method exists 
for
+     * testing, debugging and inspection purposes and it is not Thread safe!
+     */
+    boolean isQueueEmpty() {
+        return queue == null || queue.isEmpty();
+    }
+
+    /**
+     * Releases all messages and destroys the {@link Queue}.
+     */
+    private void destroy() {
+        if (queue != null) {
+
+            if (!queue.isEmpty()) {
+                logger.trace("Non-empty queue: {}", queue);
+
+                if (releaseMessages) {
+                    Object msg;
+                    while ((msg = queue.poll()) != null) {
+                        ReferenceCountUtil.safeRelease(msg);
+                    }
+                }
+            }
+
+            queue.recycle();
+            queue = null;
+        }
+    }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+        config = ctx.channel().config();
+    }
+
+    @Override
+    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+        super.handlerRemoved(ctx);
+        if (!isQueueEmpty()) {
+            dequeue(ctx, queue.size());
+        }
+        destroy();
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        destroy();
+        ctx.fireChannelInactive();
+    }
+
+    @Override
+    public void read(ChannelHandlerContext ctx) throws Exception {
+        if (dequeue(ctx, 1) == 0) {
+            // It seems no messages were consumed. We need to read() some
+            // messages from upstream and once one arrives it need to be
+            // relayed to downstream to keep the flow going.
+            shouldConsume = true;
+            ctx.read();
+        } else if (config.isAutoRead()) {
+            ctx.read();
+        }
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+        if (queue == null) {
+            queue = RecyclableArrayDeque.newInstance();
+        }
+
+        queue.offer(msg);
+
+        // We just received one message. Do we need to relay it regardless
+        // of the auto reading configuration? The answer is yes if this
+        // method was called as a result of a prior read() call.
+        int minConsume = shouldConsume ? 1 : 0;
+        shouldConsume = false;
+
+        dequeue(ctx, minConsume);
+    }
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) throws 
Exception {
+        if (isQueueEmpty()) {
+            ctx.fireChannelReadComplete();
+        } else {
+            // Don't relay completion events from upstream as they
+            // make no sense in this context. See dequeue() where
+            // a new set of completion events is being produced.
+        }
+    }
+
+    /**
+     * Dequeues one or many (or none) messages depending on the channel's auto
+     * reading state and returns the number of messages that were consumed from
+     * the internal queue.
+     *
+     * <p>The {@code minConsume} argument is used to force {@code dequeue()} 
into
+     * consuming that number of messages regardless of the channel's auto
+     * reading configuration.
+     *
+     * @see #read(ChannelHandlerContext)
+     * @see #channelRead(ChannelHandlerContext, Object)
+     */
+    private int dequeue(ChannelHandlerContext ctx, int minConsume) {
+        int consumed = 0;
+
+        // fireChannelRead(...) may call ctx.read() and so this method may 
reentrance. Because of this we need to
+        // check if queue was set to null in the meantime and if so break the 
loop.
+        while (queue != null && (consumed < minConsume || 
config.isAutoRead())) {
+            Object msg = queue.poll();
+            if (msg == null) {
+                break;
+            }
+
+            ++consumed;
+            ctx.fireChannelRead(msg);
+        }
+
+        // We're firing a completion event every time one (or more)
+        // messages were consumed and the queue ended up being drained
+        // to an empty state.
+        if (queue != null && queue.isEmpty()) {
+            queue.recycle();
+            queue = null;
+
+            if (consumed > 0) {
+                ctx.fireChannelReadComplete();
+            }
+        }
+
+        return consumed;
+    }
+
+    /**
+     * A recyclable {@link ArrayDeque}.
+     */
+    private static final class RecyclableArrayDeque extends ArrayDeque<Object> 
{
+
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * A value of {@code 2} should be a good choice for most scenarios.
+         */
+        private static final int DEFAULT_NUM_ELEMENTS = 2;
+
+        private static final Recycler<RecyclableArrayDeque> RECYCLER =
+                new Recycler<RecyclableArrayDeque>() {
+                    @Override
+                    protected RecyclableArrayDeque 
newObject(Handle<RecyclableArrayDeque> handle) {
+                        return new RecyclableArrayDeque(DEFAULT_NUM_ELEMENTS, 
handle);
+                    }
+                };
+
+        public static RecyclableArrayDeque newInstance() {
+            return RECYCLER.get();
+        }
+
+        private final Handle<RecyclableArrayDeque> handle;
+
+        private RecyclableArrayDeque(int numElements, 
Handle<RecyclableArrayDeque> handle) {
+            super(numElements);
+            this.handle = handle;
+        }
+
+        public void recycle() {
+            clear();
+            handle.recycle(this);
+        }
+    }
+}
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/PulsarFlowControlHandlerTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/PulsarFlowControlHandlerTest.java
new file mode 100644
index 00000000000..4f62fcbb64c
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/PulsarFlowControlHandlerTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.netty;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.ArrayList;
+import java.util.List;
+import org.testng.annotations.Test;
+
+/**
+ * Tests the auto-read behavior of {@link PulsarFlowControlHandler} that 
Pulsar's reactive request throttling
+ * depends on: a downstream handler disabling auto-read from inside {@code 
channelRead} must stop the delivery
+ * of queued messages immediately. Netty's {@code FlowControlHandler} lost 
this behavior in Netty 4.2.15
+ * (netty/netty#16837), which is why Pulsar carries this copy of the previous 
implementation.
+ */
+public class PulsarFlowControlHandlerTest {
+
+    /**
+     * Downstream handler that records received messages and disables 
auto-read whenever the number of received
+     * messages reaches the next configured threshold, mimicking how 
ServerCnxThrottleTracker pauses a connection
+     * while processing a message.
+     */
+    private static class ThrottlingHandler extends 
ChannelInboundHandlerAdapter {
+        private final List<Object> received = new ArrayList<>();
+        private int pauseAtCount;
+
+        ThrottlingHandler(int pauseAtCount) {
+            this.pauseAtCount = pauseAtCount;
+        }
+
+        void pauseAgainAtCount(int count) {
+            this.pauseAtCount = count;
+        }
+
+        @Override
+        public void channelRead(ChannelHandlerContext ctx, Object msg) {
+            received.add(msg);
+            if (received.size() == pauseAtCount) {
+                ctx.channel().config().setAutoRead(false);
+            }
+        }
+    }
+
+    @Test
+    public void shouldStopDeliveryWhenAutoReadIsDisabledDuringChannelRead() {
+        PulsarFlowControlHandler flowControlHandler = new 
PulsarFlowControlHandler();
+        ThrottlingHandler throttlingHandler = new ThrottlingHandler(1);
+        EmbeddedChannel channel = new EmbeddedChannel(flowControlHandler, 
throttlingHandler);
+
+        channel.writeInbound("1", "2", "3", "4", "5");
+
+        // the downstream handler disabled auto-read while processing the 
first message; the remaining messages
+        // must be held in the flow control handler's queue instead of being 
delivered
+        assertEquals(throttlingHandler.received.size(), 1);
+        assertFalse(flowControlHandler.isQueueEmpty());
+
+        // re-enabling auto-read resumes delivery; the downstream handler 
pauses again on the next message,
+        // so exactly one more message must be delivered (per-message 
auto-read granularity)
+        throttlingHandler.pauseAgainAtCount(2);
+        channel.config().setAutoRead(true);
+        assertEquals(throttlingHandler.received.size(), 2);
+        assertFalse(flowControlHandler.isQueueEmpty());
+
+        // with auto-read left enabled, the rest of the queue drains
+        throttlingHandler.pauseAgainAtCount(-1);
+        channel.config().setAutoRead(true);
+        assertEquals(throttlingHandler.received.size(), 5);
+        assertTrue(flowControlHandler.isQueueEmpty());
+
+        assertFalse(channel.finish());
+    }
+
+    @Test
+    public void shouldDeliverOneMessagePerReadWhenAutoReadIsDisabled() {
+        PulsarFlowControlHandler flowControlHandler = new 
PulsarFlowControlHandler();
+        ThrottlingHandler throttlingHandler = new ThrottlingHandler(0);
+        EmbeddedChannel channel = new EmbeddedChannel(flowControlHandler, 
throttlingHandler);
+        channel.config().setAutoRead(false);
+
+        // channel activation (with auto-read initially enabled) issued one 
read() through the pipeline, so there
+        // is one outstanding message of read demand that the first written 
message satisfies
+        channel.writeInbound("1", "2", "3");
+        assertEquals(throttlingHandler.received.size(), 1);
+
+        channel.read();
+        assertEquals(throttlingHandler.received.size(), 2);
+
+        channel.read();
+        assertEquals(throttlingHandler.received.size(), 3);
+        assertTrue(flowControlHandler.isQueueEmpty());
+
+        assertFalse(channel.finish());
+    }
+}

Reply via email to