This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new bc812ff24df [fix][client] Fix deduplication for
getPartitionedTopicMetadata to include method parameters (#24965)
bc812ff24df is described below
commit bc812ff24df0b54c48e9b60a7e2d5a5dc651a454
Author: Vinkal <[email protected]>
AuthorDate: Tue Nov 11 19:09:24 2025 +0530
[fix][client] Fix deduplication for getPartitionedTopicMetadata to include
method parameters (#24965)
Signed-off-by: Vinkal Chudgar <[email protected]>
(cherry picked from commit 0cdab922230d9d39d0eec510e66e5309df134591)
---
.../client/impl/BinaryProtoLookupService.java | 52 +++++++-
.../client/impl/BinaryProtoLookupServiceTest.java | 142 +++++++++++++++++++++
2 files changed, 191 insertions(+), 3 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index c4337f897d8..b51d92137c4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -25,6 +25,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.URI;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -67,7 +68,7 @@ public class BinaryProtoLookupService implements
LookupService {
private final ConcurrentHashMap<TopicName,
CompletableFuture<LookupTopicResult>>
lookupInProgress = new ConcurrentHashMap<>();
- private final ConcurrentHashMap<TopicName,
CompletableFuture<PartitionedTopicMetadata>>
+ private final ConcurrentHashMap<PartitionedTopicMetadataKey,
CompletableFuture<PartitionedTopicMetadata>>
partitionedMetadataInProgress = new ConcurrentHashMap<>();
private final LatencyHistogram histoGetBroker;
@@ -182,8 +183,10 @@ public class BinaryProtoLookupService implements
LookupService {
public CompletableFuture<PartitionedTopicMetadata>
getPartitionedTopicMetadata(
TopicName topicName, boolean metadataAutoCreationEnabled, boolean
useFallbackForNonPIP344Brokers) {
final MutableObject<CompletableFuture> newFutureCreated = new
MutableObject<>();
+ final PartitionedTopicMetadataKey key = new
PartitionedTopicMetadataKey(
+ topicName, metadataAutoCreationEnabled,
useFallbackForNonPIP344Brokers);
try {
- return partitionedMetadataInProgress.computeIfAbsent(topicName,
tpName -> {
+ return partitionedMetadataInProgress.computeIfAbsent(key, k -> {
CompletableFuture<PartitionedTopicMetadata> newFuture =
getPartitionedTopicMetadata(
serviceNameResolver.resolveHost(), topicName,
metadataAutoCreationEnabled,
useFallbackForNonPIP344Brokers);
@@ -193,7 +196,7 @@ public class BinaryProtoLookupService implements
LookupService {
} finally {
if (newFutureCreated.getValue() != null) {
newFutureCreated.getValue().whenComplete((v, ex) -> {
- partitionedMetadataInProgress.remove(topicName,
newFutureCreated.getValue());
+ partitionedMetadataInProgress.remove(key,
newFutureCreated.getValue());
});
}
}
@@ -491,5 +494,48 @@ public class BinaryProtoLookupService implements
LookupService {
}
+ private static final class PartitionedTopicMetadataKey {
+ private final TopicName topicName;
+ private final boolean metadataAutoCreationEnabled;
+ private final boolean useFallbackForNonPIP344Brokers;
+
+ PartitionedTopicMetadataKey(TopicName topicName,
+ boolean metadataAutoCreationEnabled,
+ boolean useFallbackForNonPIP344Brokers) {
+ this.topicName = topicName;
+ this.metadataAutoCreationEnabled = metadataAutoCreationEnabled;
+ this.useFallbackForNonPIP344Brokers =
useFallbackForNonPIP344Brokers;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PartitionedTopicMetadataKey that = (PartitionedTopicMetadataKey) o;
+ return metadataAutoCreationEnabled ==
that.metadataAutoCreationEnabled
+ && useFallbackForNonPIP344Brokers ==
that.useFallbackForNonPIP344Brokers
+ && Objects.equals(topicName, that.topicName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topicName, metadataAutoCreationEnabled,
useFallbackForNonPIP344Brokers);
+ }
+
+ @Override
+ public String toString() {
+ return "PartitionedTopicMetadataKey{"
+ + "topicName=" + topicName
+ + ", metadataAutoCreationEnabled=" +
metadataAutoCreationEnabled
+ + ", useFallbackForNonPIP344Brokers=" +
useFallbackForNonPIP344Brokers
+ + '}';
+ }
+ }
+
+
private static final Logger log =
LoggerFactory.getLogger(BinaryProtoLookupService.class);
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
index c772cf47337..c8bfa5a96da 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
@@ -24,9 +24,13 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNotSame;
+import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
@@ -37,6 +41,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.PulsarClientException.LookupException;
@@ -46,6 +51,7 @@ import
org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.BaseCommand.Type;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
@@ -191,4 +197,140 @@ public class BinaryProtoLookupServiceTest {
return lookupResult;
}
+
+ /**
+ * Verifies that getPartitionedTopicMetadata() deduplicates concurrent
requests and cleans up after completion.
+ *
+ * First, two concurrent calls with identical parameters (topicName,
metadataAutoCreationEnabled,
+ * useFallbackForNonPIP344Brokers) should return the same
CompletableFuture and trigger only one connection pool
+ * request (deduplication).
+ *
+ * Second, after the future completes, the map entry should be removed so
a subsequent call
+ * with the same parameters creates a new future (cleanup).
+ *
+ * This test uses a never-completing connection future to isolate the
deduplication logic
+ * without executing the network request path.
+ */
+ @Test(timeOut = 60000)
+ public void testPartitionedMetadataDeduplicationAndCleanup() throws
Exception {
+ PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool cnxPool = mock(ConnectionPool.class);
+
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setOperationTimeoutMs(30000);
+ when(client.getConfiguration()).thenReturn(conf);
+ when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
+ when(client.getCnxPool()).thenReturn(cnxPool);
+
+ // Never-completing connection prevents the thenAcceptAsync callback
in getPartitionedTopicMetadata
+ // from executing, isolating only the deduplication logic without
network calls.
+ CompletableFuture<ClientCnx> neverCompletes = new
CompletableFuture<>();
+
when(cnxPool.getConnection(any(InetSocketAddress.class))).thenReturn(neverCompletes);
+
+ ScheduledExecutorService scheduler =
+ Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("lookup-test-sched"));
+
+ try (BinaryProtoLookupService lookup = new BinaryProtoLookupService(
+ client, "pulsar://broker:6650", null, false, scheduler, null))
{
+
+ TopicName topic = TopicName.get("persistent://public/default/t1");
+ boolean metadataAutoCreationEnabled = true;
+ boolean useFallbackForNonPIP344Brokers = false;
+
+ CompletableFuture<PartitionedTopicMetadata> f1 =
lookup.getPartitionedTopicMetadata(topic,
+ metadataAutoCreationEnabled,
useFallbackForNonPIP344Brokers);
+ CompletableFuture<PartitionedTopicMetadata> f1b =
lookup.getPartitionedTopicMetadata(topic,
+ metadataAutoCreationEnabled,
useFallbackForNonPIP344Brokers);
+
+ // Dedup: same parameters share the same future and one
getConnection call.
+ assertSame(f1b, f1,
+ "Concurrent requests with identical parameters should
return the same future");
+ verify(cnxPool,
times(1)).getConnection(any(InetSocketAddress.class));
+
+ // Complete the future. This triggers the whenComplete callback
that removes the map entry.
+ f1.complete(new PartitionedTopicMetadata(0));
+ assertTrue(f1.isDone());
+
+ // Verify cleanup: after completion, same parameters create a new
future and
+ // trigger another connection request.
+ CompletableFuture<PartitionedTopicMetadata> f2 =
lookup.getPartitionedTopicMetadata(topic,
+ metadataAutoCreationEnabled,
useFallbackForNonPIP344Brokers);
+ org.testng.Assert.assertNotSame(f2, f1,
+ "After completion, the deduplication map entry should be
removed and a new future created");
+ verify(cnxPool,
times(2)).getConnection(any(InetSocketAddress.class));
+ } finally {
+ scheduler.shutdownNow();
+ }
+ }
+
+ /**
+ * Verifies that different parameter combinations are treated as distinct
keys.
+ *
+ * Calls that differ in metadataAutoCreationEnabled or
useFallbackForNonPIP344Brokers must return different futures
+ * and trigger separate connection requests.
+ *
+ * Cleanup is per key. Completing one does not affect another in-flight
entry.
+ */
+ @Test(timeOut = 60000)
+ public void
testPartitionedMetadataDeduplicationDifferentParameterCombinations() throws
Exception {
+ PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool cnxPool = mock(ConnectionPool.class);
+
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setOperationTimeoutMs(30000);
+ when(client.getConfiguration()).thenReturn(conf);
+ when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
+ when(client.getCnxPool()).thenReturn(cnxPool);
+
+ // Never-completing connection prevents the thenAcceptAsync callback
in getPartitionedTopicMetadata
+ // from executing, so the test exercises only the deduplication map
without network calls
+ CompletableFuture<ClientCnx> neverCompletes = new
CompletableFuture<>();
+
when(cnxPool.getConnection(any(InetSocketAddress.class))).thenReturn(neverCompletes);
+
+ ScheduledExecutorService scheduler =
+ Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("lookup-test-sched"));
+
+ try (BinaryProtoLookupService lookup = new BinaryProtoLookupService(
+ client, "pulsar://broker:6650", null, false, scheduler, null))
{
+
+ TopicName topic = TopicName.get("persistent://public/default/t1");
+
+ // A: metadataAutoCreationEnabled=true,
useFallbackForNonPIP344Brokers=false
+ CompletableFuture<PartitionedTopicMetadata> fa =
+ lookup.getPartitionedTopicMetadata(topic, true, false);
+ // B: metadataAutoCreationEnabled=false,
useFallbackForNonPIP344Brokers=false
+ CompletableFuture<PartitionedTopicMetadata> fb =
+ lookup.getPartitionedTopicMetadata(topic, false, false);
+
+ assertNotSame(fa, fb,
+ "Requests with different metadataAutoCreationEnabled must
not share the same future");
+ verify(cnxPool,
times(2)).getConnection(any(InetSocketAddress.class));
+
+ // Complete the future. This triggers the whenComplete callback
that removes the map entry.
+ fa.complete(new PartitionedTopicMetadata(0));
+
+ CompletableFuture<PartitionedTopicMetadata> fa2 =
+ lookup.getPartitionedTopicMetadata(topic, true, false);
+ assertNotSame(fa2, fa,
+ "After completion, a call with the same parameters must
create a new future");
+ verify(cnxPool,
times(3)).getConnection(any(InetSocketAddress.class));
+
+ // The call with (metadataAutoCreationEnabled=false,
useFallbackForNonPIP344Brokers=false) is still
+ // in flight. A new call with the same parameters must return the
same future (fb).
+ CompletableFuture<PartitionedTopicMetadata> fb2 =
+ lookup.getPartitionedTopicMetadata(topic, false, false);
+ assertSame(fb2, fb,
+ "An in-flight request with the same parameters must return
the original future");
+ verify(cnxPool,
times(3)).getConnection(any(InetSocketAddress.class));
+
+ // Also verify distinct fallback flag is treated as a separate key
+ CompletableFuture<PartitionedTopicMetadata> fc =
+ lookup.getPartitionedTopicMetadata(topic, false, true);
+ assertNotSame(fc, fb,
+ "Requests that differ in useFallbackForNonPIP344Brokers
must not share the same future");
+ verify(cnxPool,
times(4)).getConnection(any(InetSocketAddress.class));
+ } finally {
+ scheduler.shutdownNow();
+ }
+ }
}