This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 14395295b49 [fix][client] Fix concurrent lookup with properties might 
have different results (#23260)
14395295b49 is described below

commit 14395295b4996dcfb7eac288d92baf1104c9c576
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Sep 6 12:01:09 2024 +0800

    [fix][client] Fix concurrent lookup with properties might have different 
results (#23260)
---
 .../pulsar/client/api/LookupPropertiesTest.java    | 43 ++++++++++++++++++++++
 .../client/impl/BinaryProtoLookupService.java      | 20 ++++++----
 2 files changed, 55 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java
index cb8b2d1e526..768dc29731f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java
@@ -19,23 +19,30 @@
 package org.apache.pulsar.client.api;
 
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.MultiBrokerBaseTest;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.broker.namespace.LookupOptions;
+import org.apache.pulsar.client.impl.LookupTopicResult;
 import org.apache.pulsar.client.impl.PartitionedProducerImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -72,6 +79,7 @@ public class LookupPropertiesTest extends MultiBrokerBaseTest 
{
 
     @Test
     public void testLookupProperty() throws Exception {
+        admin.namespaces().unload("public/default");
         final var topic = "test-lookup-property";
         admin.topics().createPartitionedTopic(topic, 16);
         @Cleanup final var client = (PulsarClientImpl) PulsarClient.builder()
@@ -89,7 +97,35 @@ public class LookupPropertiesTest extends 
MultiBrokerBaseTest {
         Assert.assertEquals(port, 
additionalBrokers.get(0).getBrokerListenPort().orElseThrow());
     }
 
+    @Test
+    public void testConcurrentLookupProperties() throws Exception {
+        @Cleanup final var client = (PulsarClientImpl) PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .build();
+        final var futures = new 
ArrayList<CompletableFuture<LookupTopicResult>>();
+        BrokerIdAwareLoadManager.clientIdList.clear();
+
+        final var clientIdList = IntStream.range(0, 10).mapToObj(i -> "key-" + 
i).toList();
+        for (var clientId : clientIdList) {
+            
client.getConfiguration().setLookupProperties(Collections.singletonMap(CLIENT_KEY,
 clientId));
+            
futures.add(client.getLookup().getBroker(TopicName.get("test-concurrent-lookup-properties")));
+            
client.getConfiguration().setLookupProperties(Collections.emptyMap());
+        }
+        FutureUtil.waitForAll(futures).get();
+        Assert.assertEquals(clientIdList, 
BrokerIdAwareLoadManager.clientIdList);
+    }
+
     public static class BrokerIdAwareLoadManager extends 
ExtensibleLoadManagerImpl {
+
+        static final List<String> clientIdList = 
Collections.synchronizedList(new ArrayList<>());
+
+        @Override
+        public CompletableFuture<Optional<BrokerLookupData>> 
assign(Optional<ServiceUnitId> topic,
+                                                                    
ServiceUnitId serviceUnit, LookupOptions options) {
+            getClientId(options).ifPresent(clientIdList::add);
+            return super.assign(topic, serviceUnit, options);
+        }
+
         @Override
         public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId 
bundle, Set<String> excludeBrokerSet,
                                                                LookupOptions 
options) {
@@ -106,5 +142,12 @@ public class LookupPropertiesTest extends 
MultiBrokerBaseTest {
                         .orElseGet(() -> super.selectAsync(bundle, 
excludeBrokerSet, options));
             });
         }
+
+        private static Optional<String> getClientId(LookupOptions options) {
+            if (options.getProperties() == null) {
+                return Optional.empty();
+            }
+            return 
Optional.ofNullable(options.getProperties().get(CLIENT_KEY));
+        }
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 9dd04acce7e..b45d6e9f6a8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf;
 import io.opentelemetry.api.common.Attributes;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -32,6 +33,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
@@ -60,7 +62,7 @@ public class BinaryProtoLookupService implements 
LookupService {
     private final String listenerName;
     private final int maxLookupRedirects;
 
-    private final ConcurrentHashMap<TopicName, 
CompletableFuture<LookupTopicResult>>
+    private final ConcurrentHashMap<Pair<TopicName, Map<String, String>>, 
CompletableFuture<LookupTopicResult>>
             lookupInProgress = new ConcurrentHashMap<>();
 
     private final ConcurrentHashMap<TopicName, 
CompletableFuture<PartitionedTopicMetadata>>
@@ -118,10 +120,12 @@ public class BinaryProtoLookupService implements 
LookupService {
     public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) 
{
         long startTime = System.nanoTime();
         final MutableObject<CompletableFuture> newFutureCreated = new 
MutableObject<>();
+        final Pair<TopicName, Map<String, String>> key = Pair.of(topicName,
+                client.getConfiguration().getLookupProperties());
         try {
-            return lookupInProgress.computeIfAbsent(topicName, tpName -> {
-                CompletableFuture<LookupTopicResult> newFuture =
-                        findBroker(serviceNameResolver.resolveHost(), false, 
topicName, 0);
+            return lookupInProgress.computeIfAbsent(key, tpName -> {
+                CompletableFuture<LookupTopicResult> newFuture = 
findBroker(serviceNameResolver.resolveHost(), false,
+                        topicName, 0, key.getRight());
                 newFutureCreated.setValue(newFuture);
 
                 newFuture.thenRun(() -> {
@@ -135,7 +139,7 @@ public class BinaryProtoLookupService implements 
LookupService {
         } finally {
             if (newFutureCreated.getValue() != null) {
                 newFutureCreated.getValue().whenComplete((v, ex) -> {
-                    lookupInProgress.remove(topicName, 
newFutureCreated.getValue());
+                    lookupInProgress.remove(key, newFutureCreated.getValue());
                 });
             }
         }
@@ -167,7 +171,7 @@ public class BinaryProtoLookupService implements 
LookupService {
     }
 
     private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress 
socketAddress,
-            boolean authoritative, TopicName topicName, final int 
redirectCount) {
+            boolean authoritative, TopicName topicName, final int 
redirectCount, Map<String, String> properties) {
         CompletableFuture<LookupTopicResult> addressFuture = new 
CompletableFuture<>();
 
         if (maxLookupRedirects > 0 && redirectCount > maxLookupRedirects) {
@@ -179,7 +183,7 @@ public class BinaryProtoLookupService implements 
LookupService {
         client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx 
-> {
             long requestId = client.newRequestId();
             ByteBuf request = Commands.newLookup(topicName.toString(), 
listenerName, authoritative, requestId,
-                    client.getConfiguration().getLookupProperties());
+                    properties);
             clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
                 if (t != null) {
                     // lookup failed
@@ -204,7 +208,7 @@ public class BinaryProtoLookupService implements 
LookupService {
 
                         // (2) redirect to given address if response is: 
redirect
                         if (r.redirect) {
-                            findBroker(responseBrokerAddress, r.authoritative, 
topicName, redirectCount + 1)
+                            findBroker(responseBrokerAddress, r.authoritative, 
topicName, redirectCount + 1, properties)
                                 .thenAccept(addressFuture::complete)
                                 .exceptionally((lookupException) -> {
                                     Throwable cause = 
FutureUtil.unwrapCompletionException(lookupException);

Reply via email to