lordcheng10 opened a new issue, #16782:
URL: https://github.com/apache/pulsar/issues/16782

   ## Motivation
   As we all know, Bundle split has 3 algorithms:
   - range_equally_divide
   - topic_count_equally_divide
   - specified_positions_divide
   
   However, none of these algorithms can divide bundles according to traffic or 
qps, which may cause bundles to be split multiple times.
   
   ## Goal
   Our goal is to split bundles according to traffic or QPS, so we propose a 
PIP to introduce a split algorithm based on traffic or QPS.
   
   The main idea is that we can get the traffic or qps information of a topic 
contained in a bundle,
   and then split from the position where the traffic or qps are evenly divided.
   
   For example, there is bundle with boundaries 0x00000000 to 0x00000200, and 
four topics : t1 , t2 , t3 , t4, t5, t6.
   
   Step 1: Get their hash position and corresponding traffic or QPS:
   
   t1 with hashcode 10 msgRate 100/s throughput 1M/s
   
   t2 with hashcode 20 msgRate 200/s throughput 2M/s
   
   t3 with hashcode 80 msgRate 300/s throughput 3M/s
   
   t4 with hashcode 90 msgRate 400/s throughput 4M/s
   
   t5 with hashcode 100 msgRate 500/s throughput 5M/s
   
   t6 with hashcode 110 msgRate 2000/s throughput 190M/s
   
   
   
   Step 2: Calculate the total traffic and qps of the bundle:
   bundleMsgRate=3500
   bundleThroughput=205MB
   
   Step 3: Calculate the traffic and qps to split:
   splitBundleMsgRate=1750
   splitBundleThroughput=102.5MB
   
   
   Step 4: Calculate the position to split and split:
   splitStartPosition=100
   splitEndPosition=110
   splitPosition=(100+110)/2=105
   
   
   
   ## API Changes
   
   Added FlowOrQpsEquallyDivideBundleSplitAlgorithm class:
   
   /**
    * Split algorithm based on flow or qps.
    */
   public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements 
NamespaceBundleSplitAlgorithm {
       @Override
       public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption 
bundleSplitOption) {
         ...
       }
   }
   
   In the ServiceConfiguration class, update the default configuration 
corresponding to supportedNamespaceBundleSplitAlgorithms:
   
     private List<String> supportedNamespaceBundleSplitAlgorithms = 
Lists.newArrayList("range_equally_divide", "topic_count_equally_divide",
      "specified_positions_divide", "flow_count_equally_divide");
   
   ## Implementation
   
   The execution steps of the 
FlowOrQpsEquallyDivideBundleSplitAlgorithm#getSplitBoundary method are as 
follows:
   1. Get the hash position of each topic and the corresponding msgRate and 
msgThroughput, and sort them according to the position size:
   List<Long> topicNameHashList = new ArrayList<>(topics.size());
   Map<Long, Double> hashAndMsgMap = new HashMap<>();
   Map<Long, Double> hashAndThroughput = new HashMap<>();
   
   2. Traverse the topic position from small to large to find the position that 
can roughly evenly divide the bundle's traffic or qps:
   double bundleMsgRateTmp = 0;
   double bundleThroughputTmp = 0;
   for (int i = 0; i < topicNameHashList.size(); i++) {
       long topicHashCode = topicNameHashList.get(i);
       bundleThroughputTmp += hashAndThroughput.get(topicHashCode);
       bundleMsgRateTmp += hashAndMsgMap.get(topicHashCode);
   
       if (bundleMsgRateTmp > bundleMsgRate / 2 || bundleThroughputTmp > 
bundleThroughput / 2) {
           long splitStart = i > 0 ? topicNameHashList.get(i - 1) : 0;
           long splitEnd = topicHashCode;
           long splitMiddle = splitStart + (splitEnd - splitStart) / 2;
           splitResults.add(splitMiddle);
           break;
       }
   }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to