This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 8df251655a4 [improve] [client] Merge lookup requests for the same 
topic (#21232)
8df251655a4 is described below

commit 8df251655a4a5c4984aef7238a28d448f6accdf4
Author: fengyubiao <[email protected]>
AuthorDate: Fri Sep 22 16:26:04 2023 +0800

    [improve] [client] Merge lookup requests for the same topic (#21232)
    
    Motivation: Multiple consumers and producers can be maintained by the same 
Pulsar Client. In some cases, multiple consumers or producers might attempt to 
connect to the same topic. To optimize the process, it is recommended to 
perform the topic lookup only once for each topic.
    
    Modifications:
    - Merge lookup requests for the same topic.
    - Merge get partitioned metadata request for the same partitioned topic.
    
    (cherry picked from commit be4ab66bb8c5f141df6ed93de961a0216e0a4253)
---
 .../pulsar/client/api/BrokerServiceLookupTest.java | 108 ++++++++++++++++++++-
 .../client/impl/BinaryProtoLookupService.java      |  40 +++++++-
 2 files changed, 144 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 8b9691293a2..3cd01e7b756 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -32,6 +32,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Field;
@@ -47,6 +48,7 @@ import java.security.SecureRandom;
 import java.security.cert.Certificate;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -78,6 +80,9 @@ import 
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.client.impl.BinaryProtoLookupService;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -100,6 +105,7 @@ import org.asynchttpclient.Request;
 import org.asynchttpclient.Response;
 import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
 import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -508,11 +514,11 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
         con.connect();
         log.info("connected url: {} ", con.getURL());
         // assert connect-url: broker2-https
-        assertEquals(new Integer(con.getURL().getPort()), 
conf2.getWebServicePortTls().get());
+        assertEquals(Integer.valueOf(con.getURL().getPort()), 
conf2.getWebServicePortTls().get());
         InputStream is = con.getInputStream();
         // assert redirect-url: broker1-https only
         log.info("redirected url: {}", con.getURL());
-        assertEquals(new Integer(con.getURL().getPort()), 
conf.getWebServicePortTls().get());
+        assertEquals(Integer.valueOf(con.getURL().getPort()), 
conf.getWebServicePortTls().get());
         is.close();
 
         loadManager1 = null;
@@ -826,6 +832,104 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
         }
     }
 
+    @Test
+    public void testMergeGetPartitionedMetadataRequests() throws Exception {
+        // Assert the lookup service is a "BinaryProtoLookupService".
+        final PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) 
pulsarClient;
+        final LookupService lookupService = pulsarClientImpl.getLookup();
+        assertTrue(lookupService instanceof BinaryProtoLookupService);
+
+        final String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final int topicPartitions = 10;
+        admin.topics().createPartitionedTopic(tpName, topicPartitions);
+
+        // Verify the request is works after merge the requests.
+        List<CompletableFuture<PartitionedTopicMetadata>> futures = new 
ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            
futures.add(lookupService.getPartitionedTopicMetadata(TopicName.get(tpName)));
+        }
+        for (CompletableFuture<PartitionedTopicMetadata> future : futures) {
+            assertEquals(future.join().partitions, topicPartitions);
+        }
+
+        // cleanup.
+        admin.topics().deletePartitionedTopic(tpName);
+    }
+
+    @Test
+    public void testMergeLookupRequests() throws Exception {
+        // Assert the lookup service is a "BinaryProtoLookupService".
+        final PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) 
pulsarClient;
+        final LookupService lookupService = pulsarClientImpl.getLookup();
+        assertTrue(lookupService instanceof BinaryProtoLookupService);
+
+        final String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(tpName);
+
+        // Create 1 producer and 100 consumers.
+        List<Producer<String>> producers = new ArrayList<>();
+        List<Consumer<String>> consumers = new ArrayList<>();
+        for (int i = 0; i < 20; i++) {
+            
producers.add(pulsarClient.newProducer(Schema.STRING).topic(tpName).create());
+        }
+        for (int i = 0; i < 20; i++) {
+            
consumers.add(pulsarClient.newConsumer(Schema.STRING).topic(tpName).subscriptionName("s"
 + i).subscribe());
+        }
+
+        // Verify the lookup count will be smaller than before improve.
+        int lookupCountBeforeUnload = calculateLookupRequestCount();
+        admin.namespaces().unload(TopicName.get(tpName).getNamespace());
+        Awaitility.await().untilAsserted(() -> {
+            for (Producer p : producers) {
+                assertEquals(WhiteboxImpl.getInternalState(p, 
"state").toString(), "Ready");
+            }
+            for (Consumer c : consumers) {
+                assertEquals(WhiteboxImpl.getInternalState(c, 
"state").toString(), "Ready");
+            }
+        });
+        int lookupCountAfterUnload = calculateLookupRequestCount();
+        log.info("lookup count before unload: {}, after unload: {}", 
lookupCountBeforeUnload, lookupCountAfterUnload);
+        assertTrue(lookupCountAfterUnload < lookupCountBeforeUnload * 2,
+                "the lookup count should be smaller than before improve");
+
+        // Verify the producers and consumers is still works.
+        List<String> messagesSent = new ArrayList<>();
+        int index = 0;
+        for (Producer producer: producers) {
+            String message = Integer.valueOf(index++).toString();
+            producer.send(message);
+            messagesSent.add(message);
+        }
+        HashSet<String> messagesReceived = new HashSet<>();
+        for (Consumer<String> consumer : consumers) {
+            while (true) {
+                Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+                if (msg == null) {
+                    break;
+                }
+                messagesReceived.add(msg.getValue());
+            }
+        }
+        assertEquals(messagesReceived.size(), producers.size());
+
+        // cleanup.
+        for (Producer producer: producers) {
+            producer.close();
+        }
+        for (Consumer consumer : consumers) {
+            consumer.close();
+        }
+        admin.topics().delete(tpName);
+    }
+
+    private int calculateLookupRequestCount() throws Exception {
+        int failures = 
CollectorRegistry.defaultRegistry.getSampleValue("pulsar_broker_lookup_failures_total")
+                .intValue();
+        int answers = 
CollectorRegistry.defaultRegistry.getSampleValue("pulsar_broker_lookup_answers_total")
+                .intValue();
+        return failures + answers;
+    }
+
     @Test(timeOut = 10000)
     public void testPartitionedMetadataWithDeprecatedVersion() throws 
Exception {
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index e0b85d7bb34..7cec76d79e5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -26,10 +26,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SchemaSerializationException;
@@ -56,6 +58,12 @@ public class BinaryProtoLookupService implements 
LookupService {
     private final String listenerName;
     private final int maxLookupRedirects;
 
+    private final ConcurrentHashMap<TopicName, 
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>>
+            lookupInProgress = new ConcurrentHashMap<>();
+
+    private final ConcurrentHashMap<TopicName, 
CompletableFuture<PartitionedTopicMetadata>>
+            partitionedMetadataInProgress = new ConcurrentHashMap<>();
+
     public BinaryProtoLookupService(PulsarClientImpl client,
                                     String serviceUrl,
                                     boolean useTls,
@@ -92,7 +100,21 @@ public class BinaryProtoLookupService implements 
LookupService {
      * @return broker-socket-address that serves given topic
      */
     public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> 
getBroker(TopicName topicName) {
-        return findBroker(serviceNameResolver.resolveHost(), false, topicName, 
0);
+        final MutableObject<CompletableFuture> newFutureCreated = new 
MutableObject<>();
+        try {
+            return lookupInProgress.computeIfAbsent(topicName, tpName -> {
+                CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> 
newFuture =
+                        findBroker(serviceNameResolver.resolveHost(), false, 
topicName, 0);
+                newFutureCreated.setValue(newFuture);
+                return newFuture;
+            });
+        } finally {
+            if (newFutureCreated.getValue() != null) {
+                newFutureCreated.getValue().whenComplete((v, ex) -> {
+                    lookupInProgress.remove(topicName, 
newFutureCreated.getValue());
+                });
+            }
+        }
     }
 
     /**
@@ -100,7 +122,21 @@ public class BinaryProtoLookupService implements 
LookupService {
      *
      */
     public CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(TopicName topicName) {
-        return getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), 
topicName);
+        final MutableObject<CompletableFuture> newFutureCreated = new 
MutableObject<>();
+        try {
+            return partitionedMetadataInProgress.computeIfAbsent(topicName, 
tpName -> {
+                CompletableFuture<PartitionedTopicMetadata> newFuture =
+                        
getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), topicName);
+                newFutureCreated.setValue(newFuture);
+                return newFuture;
+            });
+        } finally {
+            if (newFutureCreated.getValue() != null) {
+                newFutureCreated.getValue().whenComplete((v, ex) -> {
+                    partitionedMetadataInProgress.remove(topicName, 
newFutureCreated.getValue());
+                });
+            }
+        }
     }
 
     private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> 
findBroker(InetSocketAddress socketAddress,

Reply via email to