This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 27f57cf4d8087694a530aa636d37cd7b803d6cce
Author: Vinkal <[email protected]>
AuthorDate: Tue Nov 11 19:09:24 2025 +0530

    [fix][client] Fix deduplication for getPartitionedTopicMetadata to include 
method parameters (#24965)
    
    Signed-off-by: Vinkal Chudgar <[email protected]>
    (cherry picked from commit 0cdab922230d9d39d0eec510e66e5309df134591)
---
 .../client/impl/BinaryProtoLookupService.java      |  52 +++++++-
 .../client/impl/BinaryProtoLookupServiceTest.java  | 142 +++++++++++++++++++++
 2 files changed, 191 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 4709a22e1c4..2365ffdfa11 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -26,6 +26,7 @@ import io.opentelemetry.api.common.Attributes;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -69,7 +70,7 @@ public class BinaryProtoLookupService implements 
LookupService {
     private final ConcurrentHashMap<Pair<TopicName, Map<String, String>>, 
CompletableFuture<LookupTopicResult>>
             lookupInProgress = new ConcurrentHashMap<>();
 
-    private final ConcurrentHashMap<TopicName, 
CompletableFuture<PartitionedTopicMetadata>>
+    private final ConcurrentHashMap<PartitionedTopicMetadataKey, 
CompletableFuture<PartitionedTopicMetadata>>
             partitionedMetadataInProgress = new ConcurrentHashMap<>();
 
     private final LatencyHistogram histoGetBroker;
@@ -188,8 +189,10 @@ public class BinaryProtoLookupService implements 
LookupService {
     public CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(
             TopicName topicName, boolean metadataAutoCreationEnabled, boolean 
useFallbackForNonPIP344Brokers) {
         final MutableObject<CompletableFuture> newFutureCreated = new 
MutableObject<>();
+        final PartitionedTopicMetadataKey key = new 
PartitionedTopicMetadataKey(
+                topicName, metadataAutoCreationEnabled, 
useFallbackForNonPIP344Brokers);
         try {
-            return partitionedMetadataInProgress.computeIfAbsent(topicName, 
tpName -> {
+            return partitionedMetadataInProgress.computeIfAbsent(key, k -> {
                 CompletableFuture<PartitionedTopicMetadata> newFuture = 
getPartitionedTopicMetadataAsync(
                        topicName, metadataAutoCreationEnabled,
                         useFallbackForNonPIP344Brokers);
@@ -199,7 +202,7 @@ public class BinaryProtoLookupService implements 
LookupService {
         } finally {
             if (newFutureCreated.getValue() != null) {
                 newFutureCreated.getValue().whenComplete((v, ex) -> {
-                    partitionedMetadataInProgress.remove(topicName, 
newFutureCreated.getValue());
+                    partitionedMetadataInProgress.remove(key, 
newFutureCreated.getValue());
                 });
             }
         }
@@ -497,5 +500,48 @@ public class BinaryProtoLookupService implements 
LookupService {
 
     }
 
+    private static final class PartitionedTopicMetadataKey {
+        private final TopicName topicName;
+        private final boolean metadataAutoCreationEnabled;
+        private final boolean useFallbackForNonPIP344Brokers;
+
+        PartitionedTopicMetadataKey(TopicName topicName,
+                               boolean metadataAutoCreationEnabled,
+                               boolean useFallbackForNonPIP344Brokers) {
+            this.topicName = topicName;
+            this.metadataAutoCreationEnabled = metadataAutoCreationEnabled;
+            this.useFallbackForNonPIP344Brokers = 
useFallbackForNonPIP344Brokers;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            PartitionedTopicMetadataKey that = (PartitionedTopicMetadataKey) o;
+            return metadataAutoCreationEnabled == 
that.metadataAutoCreationEnabled
+                    && useFallbackForNonPIP344Brokers == 
that.useFallbackForNonPIP344Brokers
+                    && Objects.equals(topicName, that.topicName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(topicName, metadataAutoCreationEnabled, 
useFallbackForNonPIP344Brokers);
+        }
+
+        @Override
+        public String toString() {
+            return "PartitionedTopicMetadataKey{"
+                    + "topicName=" + topicName
+                    + ", metadataAutoCreationEnabled=" + 
metadataAutoCreationEnabled
+                    + ", useFallbackForNonPIP344Brokers=" + 
useFallbackForNonPIP344Brokers
+                    + '}';
+        }
+    }
+
+
     private static final Logger log = 
LoggerFactory.getLogger(BinaryProtoLookupService.class);
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
index abd17903bc4..7dc5e169801 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
@@ -24,9 +24,13 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNotSame;
+import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import io.netty.buffer.ByteBuf;
@@ -37,6 +41,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.pulsar.client.api.PulsarClientException.LookupException;
@@ -46,6 +51,7 @@ import 
org.apache.pulsar.client.impl.metrics.InstrumentProvider;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.BaseCommand.Type;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.protocol.Commands;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
@@ -194,4 +200,140 @@ public class BinaryProtoLookupServiceTest {
 
         return lookupResult;
     }
+
+    /**
+     * Verifies that getPartitionedTopicMetadata() deduplicates concurrent 
requests and cleans up after completion.
+     *
+     * First, two concurrent calls with identical parameters (topicName, 
metadataAutoCreationEnabled,
+     * useFallbackForNonPIP344Brokers) should return the same 
CompletableFuture and trigger only one connection pool
+     * request (deduplication).
+     *
+     * Second, after the future completes, the map entry should be removed so 
a subsequent call
+     * with the same parameters creates a new future (cleanup).
+     *
+     * This test uses a never-completing connection future to isolate the 
deduplication logic
+     * without executing the network request path.
+     */
+    @Test(timeOut = 60000)
+    public void testPartitionedMetadataDeduplicationAndCleanup() throws 
Exception {
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool cnxPool = mock(ConnectionPool.class);
+
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setOperationTimeoutMs(30000);
+        when(client.getConfiguration()).thenReturn(conf);
+        when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
+        when(client.getCnxPool()).thenReturn(cnxPool);
+
+        // Never-completing connection prevents the thenAcceptAsync callback 
in getPartitionedTopicMetadata
+        // from executing, isolating only the deduplication logic without 
network calls.
+        CompletableFuture<ClientCnx> neverCompletes = new 
CompletableFuture<>();
+        
when(cnxPool.getConnection(any(ServiceNameResolver.class))).thenReturn(neverCompletes);
+
+        ScheduledExecutorService scheduler =
+                Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("lookup-test-sched"));
+
+        try (BinaryProtoLookupService lookup = new BinaryProtoLookupService(
+                client, "pulsar://broker:6650", null, false, scheduler, null)) 
{
+
+            TopicName topic = TopicName.get("persistent://public/default/t1");
+            boolean metadataAutoCreationEnabled = true;
+            boolean useFallbackForNonPIP344Brokers = false;
+
+            CompletableFuture<PartitionedTopicMetadata> f1 = 
lookup.getPartitionedTopicMetadata(topic,
+                    metadataAutoCreationEnabled, 
useFallbackForNonPIP344Brokers);
+            CompletableFuture<PartitionedTopicMetadata> f1b = 
lookup.getPartitionedTopicMetadata(topic,
+                    metadataAutoCreationEnabled, 
useFallbackForNonPIP344Brokers);
+
+            // Dedup: same parameters share the same future and one 
getConnection call.
+            assertSame(f1b, f1,
+                    "Concurrent requests with identical parameters should 
return the same future");
+            verify(cnxPool, 
times(1)).getConnection(any(ServiceNameResolver.class));
+
+            // Complete the future. This triggers the whenComplete callback 
that removes the map entry.
+            f1.complete(new PartitionedTopicMetadata(0));
+            assertTrue(f1.isDone());
+
+            // Verify cleanup: after completion, same parameters create a new 
future and
+            // trigger another connection request.
+            CompletableFuture<PartitionedTopicMetadata> f2 = 
lookup.getPartitionedTopicMetadata(topic,
+                            metadataAutoCreationEnabled, 
useFallbackForNonPIP344Brokers);
+            org.testng.Assert.assertNotSame(f2, f1,
+                    "After completion, the deduplication map entry should be 
removed and a new future created");
+            verify(cnxPool, 
times(2)).getConnection(any(ServiceNameResolver.class));
+        } finally {
+            scheduler.shutdownNow();
+        }
+    }
+
+    /**
+     * Verifies that different parameter combinations are treated as distinct 
keys.
+     *
+     * Calls that differ in metadataAutoCreationEnabled or 
useFallbackForNonPIP344Brokers must return different futures
+     * and trigger separate connection requests.
+     *
+     * Cleanup is per key. Completing one does not affect another in-flight 
entry.
+     */
+    @Test(timeOut = 60000)
+    public void 
testPartitionedMetadataDeduplicationDifferentParameterCombinations() throws 
Exception {
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool cnxPool = mock(ConnectionPool.class);
+
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setOperationTimeoutMs(30000);
+        when(client.getConfiguration()).thenReturn(conf);
+        when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
+        when(client.getCnxPool()).thenReturn(cnxPool);
+
+        // Never-completing connection prevents the thenAcceptAsync callback 
in getPartitionedTopicMetadata
+        // from executing, so the test exercises only the deduplication map 
without network calls
+        CompletableFuture<ClientCnx> neverCompletes = new 
CompletableFuture<>();
+        
when(cnxPool.getConnection(any(ServiceNameResolver.class))).thenReturn(neverCompletes);
+
+        ScheduledExecutorService scheduler =
+                Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("lookup-test-sched"));
+
+        try (BinaryProtoLookupService lookup = new BinaryProtoLookupService(
+                client, "pulsar://broker:6650", null, false, scheduler, null)) 
{
+
+            TopicName topic = TopicName.get("persistent://public/default/t1");
+
+            // A: metadataAutoCreationEnabled=true, 
useFallbackForNonPIP344Brokers=false
+            CompletableFuture<PartitionedTopicMetadata> fa =
+                    lookup.getPartitionedTopicMetadata(topic, true, false);
+            // B: metadataAutoCreationEnabled=false, 
useFallbackForNonPIP344Brokers=false
+            CompletableFuture<PartitionedTopicMetadata> fb =
+                    lookup.getPartitionedTopicMetadata(topic, false, false);
+
+            assertNotSame(fa, fb,
+                    "Requests with different metadataAutoCreationEnabled must 
not share the same future");
+            verify(cnxPool, 
times(2)).getConnection(any(ServiceNameResolver.class));
+
+            // Complete the future. This triggers the whenComplete callback 
that removes the map entry.
+            fa.complete(new PartitionedTopicMetadata(0));
+
+            CompletableFuture<PartitionedTopicMetadata> fa2 =
+                    lookup.getPartitionedTopicMetadata(topic, true, false);
+            assertNotSame(fa2, fa,
+                    "After completion, a call with the same parameters must 
create a new future");
+            verify(cnxPool, 
times(3)).getConnection(any(ServiceNameResolver.class));
+
+            // The call with (metadataAutoCreationEnabled=false, 
useFallbackForNonPIP344Brokers=false) is still
+            // in flight. A new call with the same parameters must return the 
same future (fb).
+            CompletableFuture<PartitionedTopicMetadata> fb2 =
+                    lookup.getPartitionedTopicMetadata(topic, false, false);
+            assertSame(fb2, fb,
+                    "An in-flight request with the same parameters must return 
the original future");
+            verify(cnxPool, 
times(3)).getConnection(any(ServiceNameResolver.class));
+
+            // Also verify distinct fallback flag is treated as a separate key
+            CompletableFuture<PartitionedTopicMetadata> fc =
+                    lookup.getPartitionedTopicMetadata(topic, false, true);
+            assertNotSame(fc, fb,
+                    "Requests that differ in useFallbackForNonPIP344Brokers 
must not share the same future");
+            verify(cnxPool, 
times(4)).getConnection(any(ServiceNameResolver.class));
+        } finally {
+            scheduler.shutdownNow();
+        }
+    }
 }

Reply via email to