This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new d90da78a419 [improve][client] Deduplicate getTopicsUnderNamespace in 
BinaryProtoLookupService (#24962)
d90da78a419 is described below

commit d90da78a419b4ff48ce3ab034f4e4a6fdee6a7c9
Author: Vinkal <[email protected]>
AuthorDate: Tue Nov 11 21:06:54 2025 +0530

    [improve][client] Deduplicate getTopicsUnderNamespace in 
BinaryProtoLookupService (#24962)
    
    Signed-off-by: Vinkal Chudgar <[email protected]>
    (cherry picked from commit 190273590e609d446c0c77cc55ce7e5f35efd24f)
---
 .../client/impl/BinaryProtoLookupService.java      |  84 +++++++++++--
 .../client/impl/BinaryProtoLookupServiceTest.java  | 133 +++++++++++++++++++++
 2 files changed, 206 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 2c520e06d36..a8c8d1d1d42 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -73,6 +73,9 @@ public class BinaryProtoLookupService implements 
LookupService {
     private final ConcurrentHashMap<PartitionedTopicMetadataKey, 
CompletableFuture<PartitionedTopicMetadata>>
             partitionedMetadataInProgress = new ConcurrentHashMap<>();
 
+    private final ConcurrentHashMap<TopicsUnderNamespaceKey, 
CompletableFuture<GetTopicsResult>>
+            topicsUnderNamespaceInProgress = new ConcurrentHashMap<>();
+
     private final LatencyHistogram histoGetBroker;
     private final LatencyHistogram histoGetTopicMetadata;
     private final LatencyHistogram histoGetSchema;
@@ -398,17 +401,31 @@ public class BinaryProtoLookupService implements 
LookupService {
                                                                                
   Mode mode,
                                                                                
   String topicsPattern,
                                                                                
   String topicsHash) {
-        CompletableFuture<GetTopicsResult> topicsFuture = new 
CompletableFuture<>();
-
-        AtomicLong opTimeoutMs = new 
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
-        Backoff backoff = new BackoffBuilder()
-                .setInitialTime(100, TimeUnit.MILLISECONDS)
-                .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
-                .setMax(1, TimeUnit.MINUTES)
-                .create();
-        getTopicsUnderNamespace(serviceNameResolver.resolveHost(), namespace, 
backoff, opTimeoutMs, topicsFuture, mode,
-                topicsPattern, topicsHash);
-        return topicsFuture;
+        final MutableObject<CompletableFuture<GetTopicsResult>> 
newFutureCreated = new MutableObject<>();
+        final TopicsUnderNamespaceKey key = new 
TopicsUnderNamespaceKey(namespace, mode, topicsPattern, topicsHash);
+
+        try {
+            return topicsUnderNamespaceInProgress.computeIfAbsent(key, k -> {
+                CompletableFuture<GetTopicsResult> topicsFuture = new 
CompletableFuture<>();
+                AtomicLong opTimeoutMs = new 
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+                Backoff backoff = new BackoffBuilder()
+                        .setInitialTime(100, TimeUnit.MILLISECONDS)
+                        .setMandatoryStop(opTimeoutMs.get() * 2, 
TimeUnit.MILLISECONDS)
+                        .setMax(1, TimeUnit.MINUTES)
+                        .create();
+
+                newFutureCreated.setValue(topicsFuture);
+                getTopicsUnderNamespace(serviceNameResolver.resolveHost(), 
namespace, backoff, opTimeoutMs,
+                        topicsFuture, mode, topicsPattern, topicsHash);
+                return topicsFuture;
+            });
+        } finally {
+            if (newFutureCreated.getValue() != null) {
+                newFutureCreated.getValue().whenComplete((v, ex) -> {
+                    topicsUnderNamespaceInProgress.remove(key, 
newFutureCreated.getValue());
+                });
+            }
+        }
     }
 
     private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
@@ -499,6 +516,51 @@ public class BinaryProtoLookupService implements 
LookupService {
 
     }
 
+    private static final class TopicsUnderNamespaceKey {
+        private final NamespaceName namespace;
+        private final Mode mode;
+        private final String topicsPattern;
+        private final String topicsHash;
+
+        TopicsUnderNamespaceKey(NamespaceName namespace, Mode mode,
+                                String topicsPattern, String topicsHash) {
+            this.namespace = namespace;
+            this.mode = mode;
+            this.topicsPattern = topicsPattern;
+            this.topicsHash = topicsHash;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TopicsUnderNamespaceKey that = (TopicsUnderNamespaceKey) o;
+            return Objects.equals(namespace, that.namespace)
+                    && mode == that.mode
+                    && Objects.equals(topicsPattern, that.topicsPattern)
+                    && Objects.equals(topicsHash, that.topicsHash);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(namespace, mode, topicsPattern, topicsHash);
+        }
+
+        @Override
+        public String toString() {
+            return "TopicsUnderNamespaceKey{"
+                    + "namespace=" + namespace
+                    + ", mode=" + mode
+                    + ", topicsPattern='" + topicsPattern + '\''
+                    + ", topicsHash='" + topicsHash + '\''
+                    + '}';
+        }
+    }
+
     private static final class PartitionedTopicMetadataKey {
         private final TopicName topicName;
         private final boolean metadataAutoCreationEnabled;
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
index 3661492bfe3..0a121a26b30 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
@@ -50,6 +50,9 @@ import 
org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.BaseCommand.Type;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import org.apache.pulsar.common.lookup.GetTopicsResult;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.protocol.Commands;
@@ -200,6 +203,136 @@ public class BinaryProtoLookupServiceTest {
         return lookupResult;
     }
 
+    /**
+     * Verifies that getTopicsUnderNamespace() deduplicates concurrent 
requests and cleans up after completion.
+     *
+     * First, two concurrent calls with identical parameters should return the 
same CompletableFuture
+     * and trigger only one connection pool request (deduplication).
+     *
+     * Second, after the future completes, the map entry should be removed so 
a subsequent call
+     * with the same parameters creates a new future (cleanup).
+     *
+     * This test uses a never-completing connection future to isolate the 
deduplication logic
+     * without executing the network request path.
+     */
+
+    @Test(timeOut = 60000)
+    public void testGetTopicsUnderNamespaceDeduplication() throws Exception {
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool cnxPool = mock(ConnectionPool.class);
+
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setOperationTimeoutMs(30000);
+        when(client.getConfiguration()).thenReturn(conf);
+        when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
+        when(client.getCnxPool()).thenReturn(cnxPool);
+
+        // Never-completing connection prevents the thenAcceptAsync callback 
in getTopicsUnderNamespace from executing,
+        // isolating only the deduplication logic without network calls.
+        CompletableFuture<ClientCnx> neverCompletes = new 
CompletableFuture<>();
+        
when(cnxPool.getConnection(any(InetSocketAddress.class))).thenReturn(neverCompletes);
+
+        ScheduledExecutorService scheduler =
+                Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("lookup-test-sched"));
+
+        try (BinaryProtoLookupService lookup = new BinaryProtoLookupService(
+                client, "pulsar://broker:6650", null, false, scheduler, 
/*lookupPinnedExecutor*/ null)) {
+
+            NamespaceName ns = NamespaceName.get("public", "default");
+            Mode mode = Mode.PERSISTENT;
+            String pattern = ".*";
+            String topicsHash = null;
+
+            CompletableFuture<GetTopicsResult> f1 = 
lookup.getTopicsUnderNamespace(ns, mode, pattern, topicsHash);
+            CompletableFuture<GetTopicsResult> f1b = 
lookup.getTopicsUnderNamespace(ns, mode, pattern, topicsHash);
+
+            assertSame(f1b, f1, "Concurrent requests with identical parameters 
should return the same future");
+
+            verify(cnxPool, 
times(1)).getConnection(any(InetSocketAddress.class));
+
+            GetTopicsResult payload = new 
GetTopicsResult(java.util.Collections.emptyList(), null, false, true);
+
+            // Complete the future. This triggers the whenComplete callback 
that removes the map entry.
+            f1.complete(payload);
+            assertTrue(f1.isDone());
+
+            // Verify cleanup: subsequent call with same parameters creates a 
new future.
+            CompletableFuture<GetTopicsResult> f2 = 
lookup.getTopicsUnderNamespace(ns, mode, pattern, topicsHash);
+            assertNotSame(f2, f1,
+                    "After completion, the deduplication map entry should be 
removed and a new future created");
+            verify(cnxPool, 
times(2)).getConnection(any(InetSocketAddress.class));
+        } finally {
+            scheduler.shutdownNow();
+        }
+    }
+
+    /**
+     * Verifies that getTopicsUnderNamespace() treats different topicsHash 
values as distinct keys for deduplication.
+     *
+     * Requests with different topicsHash values should create separate 
futures and trigger separate connection
+     * pool requests. Cleanup is per key. Completing one future does not 
affect another in-flight entry.
+     *
+     * This test uses a never-completing connection future to isolate the 
deduplication logic without executing
+     * the network request path.
+     */
+    @Test(timeOut = 60000)
+    public void testGetTopicsUnderNamespaceDeduplicationDifferentHash() throws 
Exception {
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool cnxPool = mock(ConnectionPool.class);
+
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setOperationTimeoutMs(30000);
+        when(client.getConfiguration()).thenReturn(conf);
+        when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
+        when(client.getCnxPool()).thenReturn(cnxPool);
+
+        // Never-completing connection prevents the thenAcceptAsync callback 
in getTopicsUnderNamespace from executing,
+        // isolating only the deduplication logic without network calls.
+        CompletableFuture<ClientCnx> neverCompletes = new 
CompletableFuture<>();
+        
when(cnxPool.getConnection(any(InetSocketAddress.class))).thenReturn(neverCompletes);
+
+        ScheduledExecutorService scheduler =
+                Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("lookup-test-sched"));
+
+        try (BinaryProtoLookupService lookup = new BinaryProtoLookupService(
+                client, "pulsar://broker:6650", null, false, scheduler, null)) 
{
+
+            NamespaceName ns = NamespaceName.get("public", "default");
+            Mode mode = Mode.PERSISTENT;
+            String pattern = ".*";
+
+            CompletableFuture<GetTopicsResult> futureHashA = 
lookup.getTopicsUnderNamespace(ns, mode, pattern, "HashA");
+            CompletableFuture<GetTopicsResult> futureHashB = 
lookup.getTopicsUnderNamespace(ns, mode, pattern, "HashB");
+
+            // Verify different hash values create separate futures.
+            assertNotSame(futureHashA, futureHashB,
+                    "Requests with different topicsHash must not share the 
same future");
+
+            // Verify connection pool called twice, once for each distinct 
topicsHash.
+            verify(cnxPool, 
times(2)).getConnection(any(InetSocketAddress.class));
+
+            GetTopicsResult payload = new 
GetTopicsResult(java.util.Collections.emptyList(), null, false, true);
+
+            futureHashA.complete(payload);
+
+            // Verify cleanup for HashA: subsequent call creates a new future.
+            CompletableFuture<GetTopicsResult> futureHashA2 =
+                    lookup.getTopicsUnderNamespace(ns, mode, pattern, "HashA");
+            assertNotSame(futureHashA2, futureHashA,
+                    "After completion, a call with the same topicsHash must 
create a new future");
+            verify(cnxPool, 
times(3)).getConnection(any(InetSocketAddress.class));
+
+            // Verify HashB still in-flight: subsequent call returns the 
original future.
+            CompletableFuture<GetTopicsResult> futureHashB2 =
+                    lookup.getTopicsUnderNamespace(ns, mode, pattern, "HashB");
+            assertSame(futureHashB2, futureHashB,
+                    "An in-flight request for the same topicsHash must return 
the same future");
+            verify(cnxPool, 
times(3)).getConnection(any(InetSocketAddress.class));
+        } finally {
+            scheduler.shutdownNow();
+        }
+    }
+
     /**
      * Verifies that getPartitionedTopicMetadata() deduplicates concurrent 
requests and cleans up after completion.
      *

Reply via email to