This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 27f57cf4d8087694a530aa636d37cd7b803d6cce Author: Vinkal <[email protected]> AuthorDate: Tue Nov 11 19:09:24 2025 +0530 [fix][client] Fix deduplication for getPartitionedTopicMetadata to include method parameters (#24965) Signed-off-by: Vinkal Chudgar <[email protected]> (cherry picked from commit 0cdab922230d9d39d0eec510e66e5309df134591) --- .../client/impl/BinaryProtoLookupService.java | 52 +++++++- .../client/impl/BinaryProtoLookupServiceTest.java | 142 +++++++++++++++++++++ 2 files changed, 191 insertions(+), 3 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 4709a22e1c4..2365ffdfa11 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -26,6 +26,7 @@ import io.opentelemetry.api.common.Attributes; import java.net.InetSocketAddress; import java.net.URI; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -69,7 +70,7 @@ public class BinaryProtoLookupService implements LookupService { private final ConcurrentHashMap<Pair<TopicName, Map<String, String>>, CompletableFuture<LookupTopicResult>> lookupInProgress = new ConcurrentHashMap<>(); - private final ConcurrentHashMap<TopicName, CompletableFuture<PartitionedTopicMetadata>> + private final ConcurrentHashMap<PartitionedTopicMetadataKey, CompletableFuture<PartitionedTopicMetadata>> partitionedMetadataInProgress = new ConcurrentHashMap<>(); private final LatencyHistogram histoGetBroker; @@ -188,8 +189,10 @@ public class BinaryProtoLookupService implements LookupService { public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata( TopicName topicName, boolean metadataAutoCreationEnabled, boolean useFallbackForNonPIP344Brokers) { final MutableObject<CompletableFuture> newFutureCreated = new MutableObject<>(); + final PartitionedTopicMetadataKey key = new PartitionedTopicMetadataKey( + topicName, metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers); try { - return partitionedMetadataInProgress.computeIfAbsent(topicName, tpName -> { + return partitionedMetadataInProgress.computeIfAbsent(key, k -> { CompletableFuture<PartitionedTopicMetadata> newFuture = getPartitionedTopicMetadataAsync( topicName, metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers); @@ -199,7 +202,7 @@ public class BinaryProtoLookupService implements LookupService { } finally { if (newFutureCreated.getValue() != null) { newFutureCreated.getValue().whenComplete((v, ex) -> { - partitionedMetadataInProgress.remove(topicName, newFutureCreated.getValue()); + partitionedMetadataInProgress.remove(key, newFutureCreated.getValue()); }); } } @@ -497,5 +500,48 @@ public class BinaryProtoLookupService implements LookupService { } + private static final class PartitionedTopicMetadataKey { + private final TopicName topicName; + private final boolean metadataAutoCreationEnabled; + private final boolean useFallbackForNonPIP344Brokers; + + PartitionedTopicMetadataKey(TopicName topicName, + boolean metadataAutoCreationEnabled, + boolean useFallbackForNonPIP344Brokers) { + this.topicName = topicName; + this.metadataAutoCreationEnabled = metadataAutoCreationEnabled; + this.useFallbackForNonPIP344Brokers = useFallbackForNonPIP344Brokers; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PartitionedTopicMetadataKey that = (PartitionedTopicMetadataKey) o; + return metadataAutoCreationEnabled == that.metadataAutoCreationEnabled + && useFallbackForNonPIP344Brokers == that.useFallbackForNonPIP344Brokers + && Objects.equals(topicName, that.topicName); + } + + @Override + public int hashCode() { + return Objects.hash(topicName, metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers); + } + + @Override + public String toString() { + return "PartitionedTopicMetadataKey{" + + "topicName=" + topicName + + ", metadataAutoCreationEnabled=" + metadataAutoCreationEnabled + + ", useFallbackForNonPIP344Brokers=" + useFallbackForNonPIP344Brokers + + '}'; + } + } + + private static final Logger log = LoggerFactory.getLogger(BinaryProtoLookupService.class); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java index abd17903bc4..7dc5e169801 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java @@ -24,9 +24,13 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNotSame; +import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.netty.buffer.ByteBuf; @@ -37,6 +41,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.pulsar.client.api.PulsarClientException.LookupException; @@ -46,6 +51,7 @@ import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.BaseCommand.Type; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.protocol.Commands; import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; @@ -194,4 +200,140 @@ public class BinaryProtoLookupServiceTest { return lookupResult; } + + /** + * Verifies that getPartitionedTopicMetadata() deduplicates concurrent requests and cleans up after completion. + * + * First, two concurrent calls with identical parameters (topicName, metadataAutoCreationEnabled, + * useFallbackForNonPIP344Brokers) should return the same CompletableFuture and trigger only one connection pool + * request (deduplication). + * + * Second, after the future completes, the map entry should be removed so a subsequent call + * with the same parameters creates a new future (cleanup). + * + * This test uses a never-completing connection future to isolate the deduplication logic + * without executing the network request path. + */ + @Test(timeOut = 60000) + public void testPartitionedMetadataDeduplicationAndCleanup() throws Exception { + PulsarClientImpl client = mock(PulsarClientImpl.class); + ConnectionPool cnxPool = mock(ConnectionPool.class); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setOperationTimeoutMs(30000); + when(client.getConfiguration()).thenReturn(conf); + when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP); + when(client.getCnxPool()).thenReturn(cnxPool); + + // Never-completing connection prevents the thenAcceptAsync callback in getPartitionedTopicMetadata + // from executing, isolating only the deduplication logic without network calls. + CompletableFuture<ClientCnx> neverCompletes = new CompletableFuture<>(); + when(cnxPool.getConnection(any(ServiceNameResolver.class))).thenReturn(neverCompletes); + + ScheduledExecutorService scheduler = + Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("lookup-test-sched")); + + try (BinaryProtoLookupService lookup = new BinaryProtoLookupService( + client, "pulsar://broker:6650", null, false, scheduler, null)) { + + TopicName topic = TopicName.get("persistent://public/default/t1"); + boolean metadataAutoCreationEnabled = true; + boolean useFallbackForNonPIP344Brokers = false; + + CompletableFuture<PartitionedTopicMetadata> f1 = lookup.getPartitionedTopicMetadata(topic, + metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers); + CompletableFuture<PartitionedTopicMetadata> f1b = lookup.getPartitionedTopicMetadata(topic, + metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers); + + // Dedup: same parameters share the same future and one getConnection call. + assertSame(f1b, f1, + "Concurrent requests with identical parameters should return the same future"); + verify(cnxPool, times(1)).getConnection(any(ServiceNameResolver.class)); + + // Complete the future. This triggers the whenComplete callback that removes the map entry. + f1.complete(new PartitionedTopicMetadata(0)); + assertTrue(f1.isDone()); + + // Verify cleanup: after completion, same parameters create a new future and + // trigger another connection request. + CompletableFuture<PartitionedTopicMetadata> f2 = lookup.getPartitionedTopicMetadata(topic, + metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers); + org.testng.Assert.assertNotSame(f2, f1, + "After completion, the deduplication map entry should be removed and a new future created"); + verify(cnxPool, times(2)).getConnection(any(ServiceNameResolver.class)); + } finally { + scheduler.shutdownNow(); + } + } + + /** + * Verifies that different parameter combinations are treated as distinct keys. + * + * Calls that differ in metadataAutoCreationEnabled or useFallbackForNonPIP344Brokers must return different futures + * and trigger separate connection requests. + * + * Cleanup is per key. Completing one does not affect another in-flight entry. + */ + @Test(timeOut = 60000) + public void testPartitionedMetadataDeduplicationDifferentParameterCombinations() throws Exception { + PulsarClientImpl client = mock(PulsarClientImpl.class); + ConnectionPool cnxPool = mock(ConnectionPool.class); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setOperationTimeoutMs(30000); + when(client.getConfiguration()).thenReturn(conf); + when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP); + when(client.getCnxPool()).thenReturn(cnxPool); + + // Never-completing connection prevents the thenAcceptAsync callback in getPartitionedTopicMetadata + // from executing, so the test exercises only the deduplication map without network calls + CompletableFuture<ClientCnx> neverCompletes = new CompletableFuture<>(); + when(cnxPool.getConnection(any(ServiceNameResolver.class))).thenReturn(neverCompletes); + + ScheduledExecutorService scheduler = + Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("lookup-test-sched")); + + try (BinaryProtoLookupService lookup = new BinaryProtoLookupService( + client, "pulsar://broker:6650", null, false, scheduler, null)) { + + TopicName topic = TopicName.get("persistent://public/default/t1"); + + // A: metadataAutoCreationEnabled=true, useFallbackForNonPIP344Brokers=false + CompletableFuture<PartitionedTopicMetadata> fa = + lookup.getPartitionedTopicMetadata(topic, true, false); + // B: metadataAutoCreationEnabled=false, useFallbackForNonPIP344Brokers=false + CompletableFuture<PartitionedTopicMetadata> fb = + lookup.getPartitionedTopicMetadata(topic, false, false); + + assertNotSame(fa, fb, + "Requests with different metadataAutoCreationEnabled must not share the same future"); + verify(cnxPool, times(2)).getConnection(any(ServiceNameResolver.class)); + + // Complete the future. This triggers the whenComplete callback that removes the map entry. + fa.complete(new PartitionedTopicMetadata(0)); + + CompletableFuture<PartitionedTopicMetadata> fa2 = + lookup.getPartitionedTopicMetadata(topic, true, false); + assertNotSame(fa2, fa, + "After completion, a call with the same parameters must create a new future"); + verify(cnxPool, times(3)).getConnection(any(ServiceNameResolver.class)); + + // The call with (metadataAutoCreationEnabled=false, useFallbackForNonPIP344Brokers=false) is still + // in flight. A new call with the same parameters must return the same future (fb). + CompletableFuture<PartitionedTopicMetadata> fb2 = + lookup.getPartitionedTopicMetadata(topic, false, false); + assertSame(fb2, fb, + "An in-flight request with the same parameters must return the original future"); + verify(cnxPool, times(3)).getConnection(any(ServiceNameResolver.class)); + + // Also verify distinct fallback flag is treated as a separate key + CompletableFuture<PartitionedTopicMetadata> fc = + lookup.getPartitionedTopicMetadata(topic, false, true); + assertNotSame(fc, fb, + "Requests that differ in useFallbackForNonPIP344Brokers must not share the same future"); + verify(cnxPool, times(4)).getConnection(any(ServiceNameResolver.class)); + } finally { + scheduler.shutdownNow(); + } + } }
