This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a1f447332ba [feat] PIP-468: V5 flow-control knob test coverage (#25689)
a1f447332ba is described below

commit a1f447332ba499b4a7b0c89099dc28a0497f96af
Author: Matteo Merli <[email protected]>
AuthorDate: Tue May 5 18:33:18 2026 -0700

    [feat] PIP-468: V5 flow-control knob test coverage (#25689)
---
 .../client/api/v5/V5ConsumerPriorityLevelTest.java | 162 ++++++++++++++++++++
 .../api/v5/V5ConsumerReceiverQueueSizeTest.java    | 116 ++++++++++++++
 .../client/api/v5/V5ProducerFlowControlTest.java   | 169 +++++++++++++++++++++
 3 files changed, 447 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerPriorityLevelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerPriorityLevelTest.java
new file mode 100644
index 00000000000..448c26eaaa2
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerPriorityLevelTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link QueueConsumerBuilder#priorityLevel(int)}: lower priority
+ * values dispatch first within a Shared subscription. With a low-priority
+ * consumer's prefetch fully populated, higher-priority levels see no traffic
+ * until the lower-priority consumer either ack-flushes or blocks on its queue.
+ *
+ * <p>Single-segment topic to keep the dispatch deterministic — V5 broker-side
+ * priority is enforced per-segment, so a multi-segment topic can fan messages
+ * across segments and obscure the priority ordering.
+ */
+public class V5ConsumerPriorityLevelTest extends V5ClientBaseTest {
+
+    /**
+     * Two consumers, priority 0 (high) and priority 1 (low). With both 
subscribed
+     * before any traffic, every message produced should land on the 
high-priority
+     * consumer and none on the low-priority one — until the high consumer's
+     * prefetch is full, which we keep clear by acking each message.
+     */
+    @Test
+    public void testHigherPriorityConsumerReceivesAllWhenPrefetchAvailable() 
throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        QueueConsumer<String> high = v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("priority-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .priorityLevel(0)
+                .receiverQueueSize(50)
+                .consumerName("high")
+                .subscribe();
+        @Cleanup
+        QueueConsumer<String> low = v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("priority-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .priorityLevel(1)
+                .receiverQueueSize(50)
+                .consumerName("low")
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        int n = 20;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String v = "msg-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // High-priority consumer drains everything (acking as it goes so its
+        // prefetch never fills). The broker's priority dispatcher must hand 
all
+        // messages to the priority-0 consumer before considering priority-1.
+        Set<String> received = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            Message<String> msg = high.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "high-priority consumer must receive message #" 
+ (i + 1));
+            received.add(msg.value());
+            high.acknowledge(msg.id());
+        }
+        assertEquals(received, sent, "high-priority consumer must receive 
every message");
+
+        // Low-priority consumer must have seen nothing.
+        Message<String> stragglers = low.receive(Duration.ofMillis(500));
+        assertNull(stragglers, "low-priority consumer must not receive while 
high one is draining");
+    }
+
+    /**
+     * When the high-priority consumer pauses (no acks, prefetch fills), the
+     * broker overflows to the low-priority consumer for further dispatch.
+     */
+    @Test
+    public void testLowerPriorityConsumerReceivesOverflow() throws Exception {
+        String topic = newScalableTopic(1);
+        int highReceiverQueue = 5;
+
+        @Cleanup
+        QueueConsumer<String> high = v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("overflow-priority-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .priorityLevel(0)
+                .receiverQueueSize(highReceiverQueue)
+                .consumerName("high-paused")
+                .subscribe();
+        @Cleanup
+        QueueConsumer<String> low = v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("overflow-priority-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .priorityLevel(1)
+                .receiverQueueSize(50)
+                .consumerName("low-active")
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        // Publish more than the high consumer's prefetch — once it's full, the
+        // remainder must spill to the low-priority consumer.
+        int total = highReceiverQueue * 4;
+        for (int i = 0; i < total; i++) {
+            producer.newMessage().key("k-" + i).value("v-" + i).send();
+        }
+
+        // Low-priority consumer must see the spillover (without "high" ever
+        // calling receive(), the broker treats it as having a full prefetch
+        // queue and routes onward).
+        Set<String> lowSeen = new HashSet<>();
+        long deadline = System.currentTimeMillis() + 10_000L;
+        while (lowSeen.size() < total - highReceiverQueue && 
System.currentTimeMillis() < deadline) {
+            Message<String> msg = low.receive(Duration.ofMillis(500));
+            if (msg != null) {
+                lowSeen.add(msg.value());
+                low.acknowledge(msg.id());
+            }
+        }
+        // Allow some tolerance — the boundary between "high prefetch full" and
+        // "broker spills to low" depends on broker timing. The minimum 
guarantee
+        // is that the low consumer receives strictly more than zero overflow.
+        assertEquals(lowSeen.size() >= total - highReceiverQueue * 2 - 1, true,
+                "low-priority consumer must see overflow once high consumer's 
prefetch fills, got: "
+                        + lowSeen.size());
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerReceiverQueueSizeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerReceiverQueueSizeTest.java
new file mode 100644
index 00000000000..592e8369abc
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerReceiverQueueSizeTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link QueueConsumerBuilder#receiverQueueSize(int)}: the V5 
builder
+ * must wire the user-supplied prefetch depth down to every per-segment v4
+ * {@code ConsumerImpl}. Without this wiring, the V5 setting would be silently
+ * ignored — the v4 layer would default to its own receiver queue size and the
+ * caller would have no way of knowing.
+ *
+ * <p>Verified end-to-end by reaching into the V5 internals via reflection (the
+ * V5 {@link QueueConsumer} interface itself doesn't expose the v4 consumers,
+ * since end users never need them).
+ */
+public class V5ConsumerReceiverQueueSizeTest extends V5ClientBaseTest {
+
+    @Test
+    public void testReceiverQueueSizePropagatesToV4Consumer() throws Exception 
{
+        String topic = newScalableTopic(1);
+        int requested = 17;
+
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("rq-size-sub")
+                .receiverQueueSize(requested)
+                .subscribe();
+
+        int actual = readV4ReceiverQueueSize(consumer);
+        assertEquals(actual, requested,
+                "V5 receiverQueueSize must propagate to the per-segment v4 
ConsumerImpl");
+    }
+
+    /**
+     * Multi-segment topic: every per-segment v4 consumer must inherit the same
+     * V5-configured prefetch depth, otherwise individual segments would buffer
+     * more than the user asked for.
+     */
+    @Test
+    public void testReceiverQueueSizeAppliesToEverySegment() throws Exception {
+        String topic = newScalableTopic(3);
+        int requested = 9;
+
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("rq-size-multi-sub")
+                .receiverQueueSize(requested)
+                .subscribe();
+
+        Map<Long, ?> segmentConsumers = readSegmentConsumers(consumer);
+        assertEquals(segmentConsumers.size(), 3, "expected one v4 consumer per 
segment");
+        for (CompletableFuture<?> future : 
asConsumerFutures(segmentConsumers)) {
+            Object v4Consumer = future.get();
+            int actual = invokeGetCurrentReceiverQueueSize(v4Consumer);
+            assertEquals(actual, requested,
+                    "every segment's v4 ConsumerImpl must carry the same 
receiverQueueSize");
+        }
+    }
+
+    // --- Helpers ---
+
+    private static int readV4ReceiverQueueSize(QueueConsumer<?> consumer) 
throws Exception {
+        Map<Long, ?> segmentConsumers = readSegmentConsumers(consumer);
+        assertEquals(segmentConsumers.size(), 1, "expected a single segment 
for this test");
+        CompletableFuture<?> future = 
asConsumerFutures(segmentConsumers).iterator().next();
+        Object v4Consumer = future.get();
+        return invokeGetCurrentReceiverQueueSize(v4Consumer);
+    }
+
+    private static Map<Long, ?> readSegmentConsumers(QueueConsumer<?> 
consumer) throws Exception {
+        Field field = consumer.getClass().getDeclaredField("segmentConsumers");
+        field.setAccessible(true);
+        Object map = field.get(consumer);
+        assertNotNull(map, "expected segmentConsumers map on V5 queue 
consumer");
+        return (Map<Long, ?>) map;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Iterable<CompletableFuture<?>> asConsumerFutures(Map<Long, 
?> segmentConsumers) {
+        return (Iterable<CompletableFuture<?>>) (Iterable<?>) 
segmentConsumers.values();
+    }
+
+    private static int invokeGetCurrentReceiverQueueSize(Object v4Consumer) 
throws Exception {
+        // Defined on org.apache.pulsar.client.impl.ConsumerBase — use 
reflection so the
+        // test doesn't have to import a non-API class.
+        var method = 
v4Consumer.getClass().getMethod("getCurrentReceiverQueueSize");
+        return (int) method.invoke(v4Consumer);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerFlowControlTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerFlowControlTest.java
new file mode 100644
index 00000000000..d303c93a031
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerFlowControlTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link ProducerBuilder#sendTimeout(Duration)} and
+ * {@link ProducerBuilder#blockIfQueueFull(boolean)}: the V5 builder must wire
+ * each user-supplied flow-control knob down to every per-segment v4
+ * {@code ProducerImpl}. Without this wiring, the V5 setting would be silently
+ * ignored — the v4 layer would default and the caller would have no way of
+ * knowing.
+ *
+ * <p>Behavioural verification of the actual timeout-firing and block-on-full
+ * paths lives in the v4 test suite (e.g. {@code SimpleProducerConsumerTest
+ * .testSendTimeout}); those tests stop the broker mid-send to force the
+ * pending-queue overflow / timeout, which the in-process shared cluster used
+ * here cannot do. The plumbing test suffices as a regression guard for the V5
+ * → v4 mapping.
+ */
+public class V5ProducerFlowControlTest extends V5ClientBaseTest {
+
+    @Test
+    public void testSendTimeoutPropagatesToV4Producer() throws Exception {
+        String topic = newScalableTopic(1);
+        Duration requested = Duration.ofSeconds(7);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .sendTimeout(requested)
+                .create();
+        // V5 segment producers are created lazily on first send — produce a
+        // message so the per-segment v4 ProducerImpl exists to inspect.
+        producer.newMessage().value("warm-up").send();
+
+        ProducerConfigurationData conf = readV4ProducerConf(producer);
+        assertEquals(conf.getSendTimeoutMs(), requested.toMillis(),
+                "V5 sendTimeout must propagate to the per-segment v4 
ProducerImpl");
+    }
+
+    @Test
+    public void testBlockIfQueueFullTrue() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .blockIfQueueFull(true)
+                .create();
+        producer.newMessage().value("warm-up").send();
+
+        ProducerConfigurationData conf = readV4ProducerConf(producer);
+        assertTrue(conf.isBlockIfQueueFull(),
+                "blockIfQueueFull(true) must propagate to the v4 
ProducerImpl");
+    }
+
+    @Test
+    public void testBlockIfQueueFullFalse() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .blockIfQueueFull(false)
+                .create();
+        producer.newMessage().value("warm-up").send();
+
+        ProducerConfigurationData conf = readV4ProducerConf(producer);
+        assertFalse(conf.isBlockIfQueueFull(),
+                "blockIfQueueFull(false) must propagate to the v4 
ProducerImpl");
+    }
+
+    /**
+     * Multi-segment topic: every per-segment v4 producer must inherit the same
+     * V5-configured sendTimeout, otherwise individual segments would honor
+     * different deadlines than the user asked for.
+     */
+    @Test
+    public void testSendTimeoutAppliesToEverySegment() throws Exception {
+        String topic = newScalableTopic(3);
+        Duration requested = Duration.ofMillis(2_500);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .sendTimeout(requested)
+                .create();
+        // Send across keys so every segment lazily materializes its v4 
producer.
+        for (int i = 0; i < 30; i++) {
+            producer.newMessage().key("k-" + i).value("warm-" + i).send();
+        }
+
+        Map<Long, ?> segmentProducers = readSegmentProducers(producer);
+        assertEquals(segmentProducers.size(), 3, "expected one v4 producer per 
segment");
+        for (CompletableFuture<?> future : 
asProducerFutures(segmentProducers)) {
+            Object v4Producer = future.get();
+            ProducerConfigurationData conf = readConfField(v4Producer);
+            assertEquals(conf.getSendTimeoutMs(), requested.toMillis(),
+                    "every segment's v4 ProducerImpl must carry the same 
sendTimeout");
+        }
+    }
+
+    // --- Helpers ---
+
+    private static ProducerConfigurationData readV4ProducerConf(Producer<?> 
producer) throws Exception {
+        Map<Long, ?> segmentProducers = readSegmentProducers(producer);
+        assertEquals(segmentProducers.size(), 1, "expected a single segment 
for this test");
+        CompletableFuture<?> future = 
asProducerFutures(segmentProducers).iterator().next();
+        Object v4Producer = future.get();
+        return readConfField(v4Producer);
+    }
+
+    private static Map<Long, ?> readSegmentProducers(Producer<?> producer) 
throws Exception {
+        Field field = producer.getClass().getDeclaredField("segmentProducers");
+        field.setAccessible(true);
+        Object map = field.get(producer);
+        assertNotNull(map, "expected segmentProducers map on V5 producer");
+        return (Map<Long, ?>) map;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Iterable<CompletableFuture<?>> asProducerFutures(Map<Long, 
?> segmentProducers) {
+        return (Iterable<CompletableFuture<?>>) (Iterable<?>) 
segmentProducers.values();
+    }
+
+    private static ProducerConfigurationData readConfField(Object v4Producer) 
throws Exception {
+        // ProducerBase#conf is protected; walk the class hierarchy to find it.
+        Class<?> c = v4Producer.getClass();
+        while (c != null) {
+            try {
+                Field f = c.getDeclaredField("conf");
+                f.setAccessible(true);
+                return (ProducerConfigurationData) f.get(v4Producer);
+            } catch (NoSuchFieldException e) {
+                c = c.getSuperclass();
+            }
+        }
+        throw new NoSuchFieldException("conf not found on " + 
v4Producer.getClass());
+    }
+}

Reply via email to