This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dcb41fd76c1 PIP-468: V5 plumbing-knob test coverage (#25695)
dcb41fd76c1 is described below
commit dcb41fd76c131c645a0fc05cea4c127b1943e29c
Author: Matteo Merli <[email protected]>
AuthorDate: Tue May 5 21:19:47 2026 -0700
PIP-468: V5 plumbing-knob test coverage (#25695)
---
.../client/api/v5/V5ClientBuilderConfigTest.java | 109 +++++++++++++++++++
.../client/api/v5/V5ClientLifecycleTest.java | 93 ++++++++++++++++
.../v5/V5ConsumerSubscriptionPropertiesTest.java | 118 +++++++++++++++++++++
.../api/v5/V5MessageReplicationClustersTest.java | 89 ++++++++++++++++
4 files changed, 409 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBuilderConfigTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBuilderConfigTest.java
new file mode 100644
index 00000000000..6cd4200058f
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBuilderConfigTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertSame;
+import io.opentelemetry.api.OpenTelemetry;
+import java.lang.reflect.Field;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.MemorySize;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the V5 {@link PulsarClientBuilder} configuration knobs that
don't
+ * have observable end-to-end behaviour against a single in-process broker:
+ * {@code listenerName}, {@code description}, {@code memoryLimit}, and
+ * {@code openTelemetry}. These all just plumb a value into the underlying v4
+ * {@code ClientConfigurationData} — so the tests reflect into the wrapped v4
+ * client and assert the value made it through.
+ *
+ * <p>If any of these setters silently dropped the value, the only way it would
+ * surface today is by a user reporting that their telemetry collector / broker
+ * listener / memory cap doesn't apply. These tests pin the contract.
+ */
+public class V5ClientBuilderConfigTest extends V5ClientBaseTest {
+
+ @Test
+ public void testListenerNamePropagates() throws Exception {
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(getBrokerServiceUrl())
+ .listenerName("internal")
+ .build();
+
+ ClientConfigurationData conf = readV4Conf(client);
+ assertEquals(conf.getListenerName(), "internal",
+ "listenerName must propagate to the underlying v4 client
config");
+ }
+
+ @Test
+ public void testDescriptionPropagates() throws Exception {
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(getBrokerServiceUrl())
+ .description("v5-test-client")
+ .build();
+
+ ClientConfigurationData conf = readV4Conf(client);
+ assertEquals(conf.getDescription(), "v5-test-client",
+ "description must propagate to the underlying v4 client
config");
+ }
+
+ @Test
+ public void testMemoryLimitPropagates() throws Exception {
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(getBrokerServiceUrl())
+ .memoryLimit(MemorySize.ofMegabytes(64))
+ .build();
+
+ ClientConfigurationData conf = readV4Conf(client);
+ assertEquals(conf.getMemoryLimitBytes(), 64L * 1024 * 1024,
+ "memoryLimit must propagate to the underlying v4 client
config");
+ }
+
+ @Test
+ public void testOpenTelemetryPropagates() throws Exception {
+ OpenTelemetry custom = OpenTelemetry.noop();
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(getBrokerServiceUrl())
+ .openTelemetry(custom)
+ .build();
+
+ ClientConfigurationData conf = readV4Conf(client);
+ // Same instance — the v4 layer does not clone or wrap the
OpenTelemetry handle.
+ assertSame(conf.getOpenTelemetry(), custom,
+ "openTelemetry instance must be the exact one the user
supplied");
+ }
+
+ // --- Helpers ---
+
+ private static ClientConfigurationData readV4Conf(PulsarClient v5Client)
throws Exception {
+ Field f = v5Client.getClass().getDeclaredField("v4Client");
+ f.setAccessible(true);
+ Object v4Client = f.get(v5Client);
+ assertNotNull(v4Client, "expected v4Client on V5 PulsarClient");
+ return ((PulsarClientImpl) v4Client).getConfiguration();
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientLifecycleTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientLifecycleTest.java
new file mode 100644
index 00000000000..57dc8977aa5
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientLifecycleTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for V5 {@link PulsarClient} lifecycle entry points that the
+ * existing test suite doesn't exercise: {@link PulsarClient#closeAsync()},
+ * {@link PulsarClient#shutdown()}, and {@link
PulsarClient#newTransactionAsync()}.
+ *
+ * <p>The synchronous {@code close()} and {@code newTransaction()} are already
+ * exercised heavily by every other V5 test (close via {@code @Cleanup} /
+ * {@code @AfterClass}, transactions via {@link V5TransactionTest}). The async
/
+ * shutdown variants share most of the v4 plumbing, but had no direct coverage
+ * — these tests pin the contract.
+ */
+public class V5ClientLifecycleTest extends V5ClientBaseTest {
+
+ @Test
+ public void testCloseAsyncCompletes() throws Exception {
+ PulsarClient client = newV5Client();
+ Object v4Client = readField(client, "v4Client");
+ assertTrue(v4Client instanceof PulsarClientImpl,
+ "expected v4Client to be a PulsarClientImpl, got " +
v4Client.getClass());
+
+ client.closeAsync().get(10, java.util.concurrent.TimeUnit.SECONDS);
+ assertTrue(((PulsarClientImpl) v4Client).isClosed(),
+ "underlying v4 client must be closed after closeAsync()");
+ }
+
+ @Test
+ public void testShutdownDelegatesToV4() throws Exception {
+ // shutdown() is the v4 "fast" path: stops executors, releases
connections,
+ // but deliberately does not flip the client's state to Closed (so
+ // isClosed() can still return false). The contract for V5 here is just
+ // "delegate to v4 without throwing"; observable post-shutdown
behaviour
+ // is v4's responsibility and is exercised by the v4 test suite. This
+ // test pins the V5 → v4 delegation.
+ PulsarClient client = newV5Client();
+ client.shutdown();
+ // Calling shutdown() again must remain side-effect-free and not throw.
+ client.shutdown();
+ }
+
+ @Test
+ public void testNewTransactionAsyncReturnsOpenTransaction() throws
Exception {
+ // Need a client with transactions enabled — the shared v5Client
doesn't.
+ PulsarClient client = track(PulsarClient.builder()
+ .serviceUrl(getBrokerServiceUrl())
+
.transactionPolicy(TransactionPolicy.builder().timeout(Duration.ofMinutes(1)).build())
+ .build());
+
+ Transaction txn = client.newTransactionAsync()
+ .get(10, java.util.concurrent.TimeUnit.SECONDS);
+ assertNotNull(txn, "newTransactionAsync() future must resolve to a
Transaction");
+ assertEquals(txn.state(), Transaction.State.OPEN,
+ "freshly opened transaction must be in OPEN state");
+ // Clean up — abort to leave no dangling txn on the broker.
+ txn.abort();
+ }
+
+ // --- Helpers ---
+
+ private static Object readField(Object target, String name) throws
Exception {
+ Field f = target.getClass().getDeclaredField(name);
+ f.setAccessible(true);
+ return f.get(target);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerSubscriptionPropertiesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerSubscriptionPropertiesTest.java
new file mode 100644
index 00000000000..719e8fb3a1b
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerSubscriptionPropertiesTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link QueueConsumerBuilder#subscriptionProperties(Map)}: the
V5
+ * setter must propagate to every per-segment v4 {@code Consumer}'s
+ * {@link org.apache.pulsar.client.impl.conf.ConsumerConfigurationData}, which
+ * is what the v4 wire layer ships to the broker on subscribe.
+ */
+public class V5ConsumerSubscriptionPropertiesTest extends V5ClientBaseTest {
+
+ @Test
+ public void testSubscriptionPropertiesPropagateToV4Consumer() throws
Exception {
+ String topic = newScalableTopic(1);
+ Map<String, String> props = Map.of("env", "prod", "team",
"data-platform");
+
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("sub-props-test")
+ .subscriptionProperties(props)
+ .subscribe();
+
+ Map<String, String> actual =
readSubscriptionPropertiesFromV4(consumer);
+ assertEquals(actual, props,
+ "V5 subscriptionProperties must propagate to the v4 consumer
config");
+ }
+
+ @Test
+ public void testSubscriptionPropertiesAppliesToEverySegment() throws
Exception {
+ String topic = newScalableTopic(3);
+ Map<String, String> props = Map.of("region", "us-east-1");
+
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("sub-props-multi-test")
+ .subscriptionProperties(props)
+ .subscribe();
+
+ Map<Long, ?> segmentConsumers = readSegmentConsumers(consumer);
+ assertEquals(segmentConsumers.size(), 3, "expected one v4 consumer per
segment");
+ for (CompletableFuture<?> future :
asConsumerFutures(segmentConsumers)) {
+ Object v4Consumer = future.get();
+ Map<String, String> v4Props =
readConfSubscriptionProperties(v4Consumer);
+ assertEquals(v4Props, props,
+ "every segment's v4 Consumer must carry the same
subscriptionProperties");
+ }
+ }
+
+ // --- Helpers ---
+
+ private static Map<String, String>
readSubscriptionPropertiesFromV4(QueueConsumer<?> consumer)
+ throws Exception {
+ Map<Long, ?> segmentConsumers = readSegmentConsumers(consumer);
+ assertEquals(segmentConsumers.size(), 1, "expected a single segment
for this test");
+ CompletableFuture<?> future =
asConsumerFutures(segmentConsumers).iterator().next();
+ Object v4Consumer = future.get();
+ return readConfSubscriptionProperties(v4Consumer);
+ }
+
+ private static Map<Long, ?> readSegmentConsumers(QueueConsumer<?>
consumer) throws Exception {
+ Field f = consumer.getClass().getDeclaredField("segmentConsumers");
+ f.setAccessible(true);
+ Object map = f.get(consumer);
+ assertNotNull(map, "expected segmentConsumers map on V5 queue
consumer");
+ return (Map<Long, ?>) map;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Iterable<CompletableFuture<?>> asConsumerFutures(Map<Long,
?> segmentConsumers) {
+ return (Iterable<CompletableFuture<?>>) (Iterable<?>)
segmentConsumers.values();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Map<String, String> readConfSubscriptionProperties(Object
v4Consumer) throws Exception {
+ // ConsumerBase#conf is protected; walk the class hierarchy.
+ Class<?> c = v4Consumer.getClass();
+ while (c != null) {
+ try {
+ Field f = c.getDeclaredField("conf");
+ f.setAccessible(true);
+ Object conf = f.get(v4Consumer);
+ var getter =
conf.getClass().getMethod("getSubscriptionProperties");
+ return (Map<String, String>) getter.invoke(conf);
+ } catch (NoSuchFieldException e) {
+ c = c.getSuperclass();
+ }
+ }
+ throw new NoSuchFieldException("conf not found on " +
v4Consumer.getClass());
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageReplicationClustersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageReplicationClustersTest.java
new file mode 100644
index 00000000000..0e3c1f624bf
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageReplicationClustersTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.List;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarCluster;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link MessageBuilder#replicationClusters(java.util.List)}: the
+ * V5 setter must propagate to the v4 {@code TypedMessageBuilder} that V5 uses
+ * internally, so the cluster-restriction list lands in the message metadata
+ * the broker stores.
+ *
+ * <p>Verified by sending a V5 message with an explicit cluster restriction and
+ * reflecting into the V5 {@code MessageV5} wrapper to inspect the underlying
+ * v4 {@code MessageImpl.getReplicateTo()} — that's where the message metadata
+ * becomes observable.
+ */
+public class V5MessageReplicationClustersTest extends V5ClientBaseTest {
+
+ @Test
+ public void testReplicationClustersPropagatesToMessageMetadata() throws
Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("repl-clusters-watcher")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ // Restrict to the local cluster only — same effective behaviour as the
+ // namespace default, but exercises the explicit-cluster code path.
+ List<String> clusters = List.of(SharedPulsarCluster.CLUSTER_NAME);
+
+ producer.newMessage()
+ .value("hello")
+ .replicationClusters(clusters)
+ .send();
+
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "consumer should receive the produced message");
+
+ // Reach into the V5 MessageV5 wrapper to inspect the underlying v4
metadata.
+ MessageImpl<?> v4Impl = readUnderlyingV4Message(msg);
+ assertEquals(v4Impl.getReplicateTo(), clusters,
+ "replicationClusters from V5 must land in the message
metadata");
+ }
+
+ private static MessageImpl<?> readUnderlyingV4Message(Message<?> v5Msg)
throws Exception {
+ Field f = v5Msg.getClass().getDeclaredField("v4Message");
+ f.setAccessible(true);
+ Object v4Msg = f.get(v5Msg);
+ assertNotNull(v4Msg, "expected v4Message inside V5 MessageV5 wrapper");
+ if (!(v4Msg instanceof MessageImpl)) {
+ throw new AssertionError("expected MessageImpl, got: " +
v4Msg.getClass());
+ }
+ return (MessageImpl<?>) v4Msg;
+ }
+}