This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.wiki.git
The following commit(s) were added to refs/heads/master by this push:
new dd7aedf Created PIP 35: Improve topic lookup for topics that have
high number of partitions (markdown)
dd7aedf is described below
commit dd7aedf3a304529c0fd2637fdc0022dfec288ea8
Author: Sijie Guo <[email protected]>
AuthorDate: Thu Apr 11 14:17:10 2019 +0800
Created PIP 35: Improve topic lookup for topics that have high number of
partitions (markdown)
---
...r-topics-that-have-high-number-of-partitions.md | 112 +++++++++++++++++++++
1 file changed, 112 insertions(+)
diff --git
a/PIP-35:-Improve-topic-lookup-for-topics-that-have-high-number-of-partitions.md
b/PIP-35:-Improve-topic-lookup-for-topics-that-have-high-number-of-partitions.md
new file mode 100644
index 0000000..09ba62d
--- /dev/null
+++
b/PIP-35:-Improve-topic-lookup-for-topics-that-have-high-number-of-partitions.md
@@ -0,0 +1,112 @@
+ * **Status**: Proposal
+ * **Author**: Zhengxin Cai
+ * **Pull Request**:
+ * **Mailing List discussion**:
http://mail-archives.apache.org/mod_mbox/pulsar-dev/201904.mbox/%3CCAHQi0xemFAGC6P9e240-8Ho0z4qtUv0uSBLtHfSs5ZeBDJXEug%40mail.gmail.com%3E
+
+## Motivation
+
+As this [github issue](https://github.com/apache/pulsar/issues/1088) brings
up, for a partitioned topic, client has to send LookUp request for each
partition. Here we propose a solution to make lookup easier for partitioned
topic.
+
+We could either make current LookUp(getBroker) api smarter and it will check
if topic is a partitioned topic or not. If it’s a normal topic just return
normal LookUp response which will be Pair<InetSocketAddress,
InetSocketAddress>, if it’s a partitioned topic then it will return
+List<Pair<InetSocketAddress, InetSocketAddress>> contains all brokers that
serving that partitioned topic.
+
+Or we could create a new BatchLookUp api, like getBrokers. It can take a list
of topic as input, where each topic has to be either a non partitioned topic or
a partition of a partitioned topic and it’ll return Map<TopicName,
Pair<InetSocketAddress, InetSocketAddress>> where it’s a map of input topic to
corresponding address.
+
+Prefer to go with second approach as it not only support lookup for all
partitions of partitioned topic, it also support bulk lookup for many normal
topics in one request.
+
+## Proposal
+
+In TopicLookupBase is where the core lookup logic will happens.
+Add new lookupTopicBatchAsync, within the method, after validation, for each
topic, we’ll fist find out the bundle it belong, then invoke
NameSpaceService.findBrokerServiceUrl for each bundle and create a Map<Bundle ,
List<TopicName>>.
+Then iterate through map and create response for each topicname.
+
+In LookupService, when findBrokers invoke newBatchLookup, it’ll get a
response of a list<lookupresponse>, where some of response contains redirect,
for these response will invoke findBrokers again to contact correct cluster.
Combine the result of 2 invocation to get a final result.
+
+Here’s proposed change to protocal buffer definition and apis:
+
+*PulsarApi.proto*
+
+```
+ Add
+ Type enum:
+ BATCH_LOOKUP = 23;
+ BATCH_LOOKUP_RESPONSE = 24;
+
+ BaseBommand:
+ optional CommandBatchLookupTopic lookupTopic
= 23;
+ optional CommandBatchLookupTopicResponse lookupTopicResponse
= 24;
+
+ CommandBatchLookupTopic:
+ {
+ repeated string topics = 1;
+ required uint64 request_id = 2;
+ }
+
+ CommandTopicResponse:
+ {
+ repeated string topicname = 10;
+ }
+```
+
+
+
+- Commands.java
+ - Add newBatchLookUp
+
+- ClientCnx.java
+ - Add newBatchLookup/handleBatchLookUpResponse
+
+- ServiceCnx.java
+ - Add handleBatchLookup
+
+- LookupProxyHandler.java
+ - Add handleBatchLookup
+
+- ProxyConnection.java
+ - Add handleBatchLookup
+
+- PulsarDecoder.java
+ - Handle batchLookup
+
+- Add BatchLookupResult.java
+
+- TopicLookupBase.java
+ - Add lookupTopicBatchAsync
+```
+List<CompletableFuture<ByteBuf>> lookupTopicBatchAsync(list<topics> topics) {
+ List<CompletableFuture> resultFutures;
+List<CompletableFuture> futures;
+ Map<Bundle.toString, List<Topic> map;
+ topics.foreach((topic) -> {
+CompletableFuture validateFuture;
+ CompletableFuture getNameSpaceBundleFuture;
+CompletableFuture lookupFuture;
+
+getClusterDataIfDifferentCluster.thenAccept(validateFuture.complete)
+ validateFuture.thenAccept(validaitonFailureResponse -> {
+ if(validaitonFailureResponse != null)
+ lookupFuture.complete(validaitonFailureResponse)
+pulsarService.getNamespaceService().getBundleAsync().
+thenAccept(bundle -> map.get(bundle).add(requestId))
+
+getNameSpaceBundleFuture.complete())
+
+futures.add(getNameSpaceBundleFuture)
+}}))
+FutureUtil.waitForAll(futures).then(map.entry.foreach((entry) -> {
+ entry.key.findBrokerServiceUrl().thenAccept(lookupResult -> {
+//Response will be address with success or redirect flag, and a list of
topicnames in lookup request that belong to this bundle
+ lookupFuture.complete(newLookupResponse()))
+ resultFutures.add(lookupFuture)
+})
+}))
+
+return resultFutures;
+}
+```
+
+- TopicLookup.java(v2)
+ - Add lookupTopicBatchAsync
+
+- BinaryProto/HttpLookupService.java
+ - Add getBrokers/findBrokers