This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.wiki.git


The following commit(s) were added to refs/heads/master by this push:
     new dd7aedf  Created PIP 35: Improve topic lookup for topics that have 
high number of partitions (markdown)
dd7aedf is described below

commit dd7aedf3a304529c0fd2637fdc0022dfec288ea8
Author: Sijie Guo <[email protected]>
AuthorDate: Thu Apr 11 14:17:10 2019 +0800

    Created PIP 35: Improve topic lookup for topics that have high number of 
partitions (markdown)
---
 ...r-topics-that-have-high-number-of-partitions.md | 112 +++++++++++++++++++++
 1 file changed, 112 insertions(+)

diff --git 
a/PIP-35:-Improve-topic-lookup-for-topics-that-have-high-number-of-partitions.md
 
b/PIP-35:-Improve-topic-lookup-for-topics-that-have-high-number-of-partitions.md
new file mode 100644
index 0000000..09ba62d
--- /dev/null
+++ 
b/PIP-35:-Improve-topic-lookup-for-topics-that-have-high-number-of-partitions.md
@@ -0,0 +1,112 @@
+ * **Status**: Proposal
+ * **Author**: Zhengxin Cai
+ * **Pull Request**: 
+ * **Mailing List discussion**: 
http://mail-archives.apache.org/mod_mbox/pulsar-dev/201904.mbox/%3CCAHQi0xemFAGC6P9e240-8Ho0z4qtUv0uSBLtHfSs5ZeBDJXEug%40mail.gmail.com%3E
+
+## Motivation
+
+As this [github issue](https://github.com/apache/pulsar/issues/1088) brings 
up, for a partitioned topic, client has to send LookUp request for each 
partition. Here we propose a solution to make lookup easier for partitioned 
topic.
+
+We could either make current LookUp(getBroker) api smarter and it will check 
if topic is a partitioned topic or not. If it’s a normal topic just return 
normal LookUp response which will be Pair<InetSocketAddress, 
InetSocketAddress>, if it’s a partitioned topic then it will return 
+List<Pair<InetSocketAddress, InetSocketAddress>> contains all brokers that 
serving that partitioned topic.
+
+Or we could create a new BatchLookUp api, like getBrokers. It can take a list 
of topic as input, where each topic has to be either a non partitioned topic or 
a partition of a partitioned topic and it’ll return Map<TopicName, 
Pair<InetSocketAddress, InetSocketAddress>> where it’s a map of input topic to 
corresponding address.
+
+Prefer to go with second approach as it not only support lookup for all 
partitions of partitioned topic, it also support bulk lookup for many normal 
topics in one request.
+
+## Proposal
+
+In TopicLookupBase is where the core lookup logic will happens. 
+Add new lookupTopicBatchAsync,  within the method, after validation, for each 
topic, we’ll fist find out the bundle it belong, then invoke 
NameSpaceService.findBrokerServiceUrl for each bundle and create a Map<Bundle , 
List<TopicName>>.
+Then iterate through map and create response for each topicname.
+
+In LookupService, when findBrokers invoke newBatchLookup,  it’ll get a 
response of a list<lookupresponse>, where some of response contains redirect, 
for these response will invoke findBrokers again to contact correct cluster. 
Combine the result of 2 invocation to get a final result.
+
+Here’s proposed change to protocal buffer definition and apis:
+
+*PulsarApi.proto*
+
+```
+       Add 
+       Type enum:
+               BATCH_LOOKUP           = 23;
+               BATCH_LOOKUP_RESPONSE  = 24;
+
+       BaseBommand:
+               optional CommandBatchLookupTopic lookupTopic                    
= 23;
+               optional CommandBatchLookupTopicResponse lookupTopicResponse    
= 24;
+
+       CommandBatchLookupTopic:
+       {
+               repeated string topics           = 1;
+               required uint64 request_id       = 2;
+       }
+
+       CommandTopicResponse:
+       {       
+               repeated string topicname               = 10;
+       }
+```
+
+
+
+- Commands.java
+       - Add newBatchLookUp
+
+- ClientCnx.java
+       - Add newBatchLookup/handleBatchLookUpResponse
+
+- ServiceCnx.java
+       - Add handleBatchLookup
+
+- LookupProxyHandler.java   
+       - Add handleBatchLookup
+
+- ProxyConnection.java
+       - Add handleBatchLookup
+
+- PulsarDecoder.java
+       - Handle batchLookup
+
+- Add BatchLookupResult.java
+
+- TopicLookupBase.java
+       - Add lookupTopicBatchAsync
+```
+List<CompletableFuture<ByteBuf>> lookupTopicBatchAsync(list<topics> topics) {
+       List<CompletableFuture> resultFutures;
+List<CompletableFuture> futures;
+       Map<Bundle.toString, List<Topic> map;
+            topics.foreach((topic) -> {
+CompletableFuture validateFuture;
+            CompletableFuture getNameSpaceBundleFuture;
+CompletableFuture lookupFuture;
+
+getClusterDataIfDifferentCluster.thenAccept(validateFuture.complete)
+       validateFuture.thenAccept(validaitonFailureResponse -> {
+               if(validaitonFailureResponse != null)
+                       lookupFuture.complete(validaitonFailureResponse)
+pulsarService.getNamespaceService().getBundleAsync().
+thenAccept(bundle -> map.get(bundle).add(requestId))
+
+getNameSpaceBundleFuture.complete())
+
+futures.add(getNameSpaceBundleFuture)
+}}))
+FutureUtil.waitForAll(futures).then(map.entry.foreach((entry) -> {
+       entry.key.findBrokerServiceUrl().thenAccept(lookupResult -> {
+//Response will be address with success or redirect flag, and a list of 
topicnames in lookup request that belong to this bundle
+               lookupFuture.complete(newLookupResponse()))
+               resultFutures.add(lookupFuture)
+})
+}))
+
+return resultFutures;
+}
+```
+
+- TopicLookup.java(v2)
+       - Add lookupTopicBatchAsync
+
+- BinaryProto/HttpLookupService.java
+       - Add getBrokers/findBrokers

Reply via email to