This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dcb41fd76c1 PIP-468: V5 plumbing-knob test coverage (#25695)
dcb41fd76c1 is described below

commit dcb41fd76c131c645a0fc05cea4c127b1943e29c
Author: Matteo Merli <[email protected]>
AuthorDate: Tue May 5 21:19:47 2026 -0700

    PIP-468: V5 plumbing-knob test coverage (#25695)
---
 .../client/api/v5/V5ClientBuilderConfigTest.java   | 109 +++++++++++++++++++
 .../client/api/v5/V5ClientLifecycleTest.java       |  93 ++++++++++++++++
 .../v5/V5ConsumerSubscriptionPropertiesTest.java   | 118 +++++++++++++++++++++
 .../api/v5/V5MessageReplicationClustersTest.java   |  89 ++++++++++++++++
 4 files changed, 409 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBuilderConfigTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBuilderConfigTest.java
new file mode 100644
index 00000000000..6cd4200058f
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBuilderConfigTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertSame;
+import io.opentelemetry.api.OpenTelemetry;
+import java.lang.reflect.Field;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.MemorySize;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the V5 {@link PulsarClientBuilder} configuration knobs that 
don't
+ * have observable end-to-end behaviour against a single in-process broker:
+ * {@code listenerName}, {@code description}, {@code memoryLimit}, and
+ * {@code openTelemetry}. These all just plumb a value into the underlying v4
+ * {@code ClientConfigurationData} — so the tests reflect into the wrapped v4
+ * client and assert the value made it through.
+ *
+ * <p>If any of these setters silently dropped the value, the only way it would
+ * surface today is by a user reporting that their telemetry collector / broker
+ * listener / memory cap doesn't apply. These tests pin the contract.
+ */
+public class V5ClientBuilderConfigTest extends V5ClientBaseTest {
+
+    @Test
+    public void testListenerNamePropagates() throws Exception {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(getBrokerServiceUrl())
+                .listenerName("internal")
+                .build();
+
+        ClientConfigurationData conf = readV4Conf(client);
+        assertEquals(conf.getListenerName(), "internal",
+                "listenerName must propagate to the underlying v4 client 
config");
+    }
+
+    @Test
+    public void testDescriptionPropagates() throws Exception {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(getBrokerServiceUrl())
+                .description("v5-test-client")
+                .build();
+
+        ClientConfigurationData conf = readV4Conf(client);
+        assertEquals(conf.getDescription(), "v5-test-client",
+                "description must propagate to the underlying v4 client 
config");
+    }
+
+    @Test
+    public void testMemoryLimitPropagates() throws Exception {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(getBrokerServiceUrl())
+                .memoryLimit(MemorySize.ofMegabytes(64))
+                .build();
+
+        ClientConfigurationData conf = readV4Conf(client);
+        assertEquals(conf.getMemoryLimitBytes(), 64L * 1024 * 1024,
+                "memoryLimit must propagate to the underlying v4 client 
config");
+    }
+
+    @Test
+    public void testOpenTelemetryPropagates() throws Exception {
+        OpenTelemetry custom = OpenTelemetry.noop();
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(getBrokerServiceUrl())
+                .openTelemetry(custom)
+                .build();
+
+        ClientConfigurationData conf = readV4Conf(client);
+        // Same instance — the v4 layer does not clone or wrap the 
OpenTelemetry handle.
+        assertSame(conf.getOpenTelemetry(), custom,
+                "openTelemetry instance must be the exact one the user 
supplied");
+    }
+
+    // --- Helpers ---
+
+    private static ClientConfigurationData readV4Conf(PulsarClient v5Client) 
throws Exception {
+        Field f = v5Client.getClass().getDeclaredField("v4Client");
+        f.setAccessible(true);
+        Object v4Client = f.get(v5Client);
+        assertNotNull(v4Client, "expected v4Client on V5 PulsarClient");
+        return ((PulsarClientImpl) v4Client).getConfiguration();
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientLifecycleTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientLifecycleTest.java
new file mode 100644
index 00000000000..57dc8977aa5
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientLifecycleTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for V5 {@link PulsarClient} lifecycle entry points that the
+ * existing test suite doesn't exercise: {@link PulsarClient#closeAsync()},
+ * {@link PulsarClient#shutdown()}, and {@link 
PulsarClient#newTransactionAsync()}.
+ *
+ * <p>The synchronous {@code close()} and {@code newTransaction()} are already
+ * exercised heavily by every other V5 test (close via {@code @Cleanup} /
+ * {@code @AfterClass}, transactions via {@link V5TransactionTest}). The async 
/
+ * shutdown variants share most of the v4 plumbing, but had no direct coverage
+ * — these tests pin the contract.
+ */
+public class V5ClientLifecycleTest extends V5ClientBaseTest {
+
+    @Test
+    public void testCloseAsyncCompletes() throws Exception {
+        PulsarClient client = newV5Client();
+        Object v4Client = readField(client, "v4Client");
+        assertTrue(v4Client instanceof PulsarClientImpl,
+                "expected v4Client to be a PulsarClientImpl, got " + 
v4Client.getClass());
+
+        client.closeAsync().get(10, java.util.concurrent.TimeUnit.SECONDS);
+        assertTrue(((PulsarClientImpl) v4Client).isClosed(),
+                "underlying v4 client must be closed after closeAsync()");
+    }
+
+    @Test
+    public void testShutdownDelegatesToV4() throws Exception {
+        // shutdown() is the v4 "fast" path: stops executors, releases 
connections,
+        // but deliberately does not flip the client's state to Closed (so
+        // isClosed() can still return false). The contract for V5 here is just
+        // "delegate to v4 without throwing"; observable post-shutdown 
behaviour
+        // is v4's responsibility and is exercised by the v4 test suite. This
+        // test pins the V5 → v4 delegation.
+        PulsarClient client = newV5Client();
+        client.shutdown();
+        // Calling shutdown() again must remain side-effect-free and not throw.
+        client.shutdown();
+    }
+
+    @Test
+    public void testNewTransactionAsyncReturnsOpenTransaction() throws 
Exception {
+        // Need a client with transactions enabled — the shared v5Client 
doesn't.
+        PulsarClient client = track(PulsarClient.builder()
+                .serviceUrl(getBrokerServiceUrl())
+                
.transactionPolicy(TransactionPolicy.builder().timeout(Duration.ofMinutes(1)).build())
+                .build());
+
+        Transaction txn = client.newTransactionAsync()
+                .get(10, java.util.concurrent.TimeUnit.SECONDS);
+        assertNotNull(txn, "newTransactionAsync() future must resolve to a 
Transaction");
+        assertEquals(txn.state(), Transaction.State.OPEN,
+                "freshly opened transaction must be in OPEN state");
+        // Clean up — abort to leave no dangling txn on the broker.
+        txn.abort();
+    }
+
+    // --- Helpers ---
+
+    private static Object readField(Object target, String name) throws 
Exception {
+        Field f = target.getClass().getDeclaredField(name);
+        f.setAccessible(true);
+        return f.get(target);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerSubscriptionPropertiesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerSubscriptionPropertiesTest.java
new file mode 100644
index 00000000000..719e8fb3a1b
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerSubscriptionPropertiesTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link QueueConsumerBuilder#subscriptionProperties(Map)}: the 
V5
+ * setter must propagate to every per-segment v4 {@code Consumer}'s
+ * {@link org.apache.pulsar.client.impl.conf.ConsumerConfigurationData}, which
+ * is what the v4 wire layer ships to the broker on subscribe.
+ */
+public class V5ConsumerSubscriptionPropertiesTest extends V5ClientBaseTest {
+
+    @Test
+    public void testSubscriptionPropertiesPropagateToV4Consumer() throws 
Exception {
+        String topic = newScalableTopic(1);
+        Map<String, String> props = Map.of("env", "prod", "team", 
"data-platform");
+
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("sub-props-test")
+                .subscriptionProperties(props)
+                .subscribe();
+
+        Map<String, String> actual = 
readSubscriptionPropertiesFromV4(consumer);
+        assertEquals(actual, props,
+                "V5 subscriptionProperties must propagate to the v4 consumer 
config");
+    }
+
+    @Test
+    public void testSubscriptionPropertiesAppliesToEverySegment() throws 
Exception {
+        String topic = newScalableTopic(3);
+        Map<String, String> props = Map.of("region", "us-east-1");
+
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("sub-props-multi-test")
+                .subscriptionProperties(props)
+                .subscribe();
+
+        Map<Long, ?> segmentConsumers = readSegmentConsumers(consumer);
+        assertEquals(segmentConsumers.size(), 3, "expected one v4 consumer per 
segment");
+        for (CompletableFuture<?> future : 
asConsumerFutures(segmentConsumers)) {
+            Object v4Consumer = future.get();
+            Map<String, String> v4Props = 
readConfSubscriptionProperties(v4Consumer);
+            assertEquals(v4Props, props,
+                    "every segment's v4 Consumer must carry the same 
subscriptionProperties");
+        }
+    }
+
+    // --- Helpers ---
+
+    private static Map<String, String> 
readSubscriptionPropertiesFromV4(QueueConsumer<?> consumer)
+            throws Exception {
+        Map<Long, ?> segmentConsumers = readSegmentConsumers(consumer);
+        assertEquals(segmentConsumers.size(), 1, "expected a single segment 
for this test");
+        CompletableFuture<?> future = 
asConsumerFutures(segmentConsumers).iterator().next();
+        Object v4Consumer = future.get();
+        return readConfSubscriptionProperties(v4Consumer);
+    }
+
+    private static Map<Long, ?> readSegmentConsumers(QueueConsumer<?> 
consumer) throws Exception {
+        Field f = consumer.getClass().getDeclaredField("segmentConsumers");
+        f.setAccessible(true);
+        Object map = f.get(consumer);
+        assertNotNull(map, "expected segmentConsumers map on V5 queue 
consumer");
+        return (Map<Long, ?>) map;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Iterable<CompletableFuture<?>> asConsumerFutures(Map<Long, 
?> segmentConsumers) {
+        return (Iterable<CompletableFuture<?>>) (Iterable<?>) 
segmentConsumers.values();
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Map<String, String> readConfSubscriptionProperties(Object 
v4Consumer) throws Exception {
+        // ConsumerBase#conf is protected; walk the class hierarchy.
+        Class<?> c = v4Consumer.getClass();
+        while (c != null) {
+            try {
+                Field f = c.getDeclaredField("conf");
+                f.setAccessible(true);
+                Object conf = f.get(v4Consumer);
+                var getter = 
conf.getClass().getMethod("getSubscriptionProperties");
+                return (Map<String, String>) getter.invoke(conf);
+            } catch (NoSuchFieldException e) {
+                c = c.getSuperclass();
+            }
+        }
+        throw new NoSuchFieldException("conf not found on " + 
v4Consumer.getClass());
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageReplicationClustersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageReplicationClustersTest.java
new file mode 100644
index 00000000000..0e3c1f624bf
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageReplicationClustersTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.List;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarCluster;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link MessageBuilder#replicationClusters(java.util.List)}: the
+ * V5 setter must propagate to the v4 {@code TypedMessageBuilder} that V5 uses
+ * internally, so the cluster-restriction list lands in the message metadata
+ * the broker stores.
+ *
+ * <p>Verified by sending a V5 message with an explicit cluster restriction and
+ * reflecting into the V5 {@code MessageV5} wrapper to inspect the underlying
+ * v4 {@code MessageImpl.getReplicateTo()} — that's where the message metadata
+ * becomes observable.
+ */
+public class V5MessageReplicationClustersTest extends V5ClientBaseTest {
+
+    @Test
+    public void testReplicationClustersPropagatesToMessageMetadata() throws 
Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("repl-clusters-watcher")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        // Restrict to the local cluster only — same effective behaviour as the
+        // namespace default, but exercises the explicit-cluster code path.
+        List<String> clusters = List.of(SharedPulsarCluster.CLUSTER_NAME);
+
+        producer.newMessage()
+                .value("hello")
+                .replicationClusters(clusters)
+                .send();
+
+        Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(msg, "consumer should receive the produced message");
+
+        // Reach into the V5 MessageV5 wrapper to inspect the underlying v4 
metadata.
+        MessageImpl<?> v4Impl = readUnderlyingV4Message(msg);
+        assertEquals(v4Impl.getReplicateTo(), clusters,
+                "replicationClusters from V5 must land in the message 
metadata");
+    }
+
+    private static MessageImpl<?> readUnderlyingV4Message(Message<?> v5Msg) 
throws Exception {
+        Field f = v5Msg.getClass().getDeclaredField("v4Message");
+        f.setAccessible(true);
+        Object v4Msg = f.get(v5Msg);
+        assertNotNull(v4Msg, "expected v4Message inside V5 MessageV5 wrapper");
+        if (!(v4Msg instanceof MessageImpl)) {
+            throw new AssertionError("expected MessageImpl, got: " + 
v4Msg.getClass());
+        }
+        return (MessageImpl<?>) v4Msg;
+    }
+}

Reply via email to