lordcheng10 opened a new issue, #16782:
URL: https://github.com/apache/pulsar/issues/16782
## Motivation
As we all know, Bundle split has 3 algorithms:
- range_equally_divide
- topic_count_equally_divide
- specified_positions_divide
However, none of these algorithms can divide bundles according to traffic or
qps, which may cause bundles to be split multiple times.
## Goal
Our goal is to split bundles according to traffic or QPS, so we propose a
PIP to introduce a split algorithm based on traffic or QPS.
The main idea is that we can get the traffic or qps information of a topic
contained in a bundle,
and then split from the position where the traffic or qps are evenly divided.
For example, there is bundle with boundaries 0x00000000 to 0x00000200, and
four topics : t1 , t2 , t3 , t4, t5, t6.
Step 1: Get their hash position and corresponding traffic or QPS:
t1 with hashcode 10 msgRate 100/s throughput 1M/s
t2 with hashcode 20 msgRate 200/s throughput 2M/s
t3 with hashcode 80 msgRate 300/s throughput 3M/s
t4 with hashcode 90 msgRate 400/s throughput 4M/s
t5 with hashcode 100 msgRate 500/s throughput 5M/s
t6 with hashcode 110 msgRate 2000/s throughput 190M/s
Step 2: Calculate the total traffic and qps of the bundle:
bundleMsgRate=3500
bundleThroughput=205MB
Step 3: Calculate the traffic and qps to split:
splitBundleMsgRate=1750
splitBundleThroughput=102.5MB
Step 4: Calculate the position to split and split:
splitStartPosition=100
splitEndPosition=110
splitPosition=(100+110)/2=105
## API Changes
Added FlowOrQpsEquallyDivideBundleSplitAlgorithm class:
/**
* Split algorithm based on flow or qps.
*/
public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements
NamespaceBundleSplitAlgorithm {
@Override
public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption
bundleSplitOption) {
...
}
}
In the ServiceConfiguration class, update the default configuration
corresponding to supportedNamespaceBundleSplitAlgorithms:
private List<String> supportedNamespaceBundleSplitAlgorithms =
Lists.newArrayList("range_equally_divide", "topic_count_equally_divide",
"specified_positions_divide", "flow_count_equally_divide");
## Implementation
The execution steps of the
FlowOrQpsEquallyDivideBundleSplitAlgorithm#getSplitBoundary method are as
follows:
1. Get the hash position of each topic and the corresponding msgRate and
msgThroughput, and sort them according to the position size:
List<Long> topicNameHashList = new ArrayList<>(topics.size());
Map<Long, Double> hashAndMsgMap = new HashMap<>();
Map<Long, Double> hashAndThroughput = new HashMap<>();
2. Traverse the topic position from small to large to find the position that
can roughly evenly divide the bundle's traffic or qps:
double bundleMsgRateTmp = 0;
double bundleThroughputTmp = 0;
for (int i = 0; i < topicNameHashList.size(); i++) {
long topicHashCode = topicNameHashList.get(i);
bundleThroughputTmp += hashAndThroughput.get(topicHashCode);
bundleMsgRateTmp += hashAndMsgMap.get(topicHashCode);
if (bundleMsgRateTmp > bundleMsgRate / 2 || bundleThroughputTmp >
bundleThroughput / 2) {
long splitStart = i > 0 ? topicNameHashList.get(i - 1) : 0;
long splitEnd = topicHashCode;
long splitMiddle = splitStart + (splitEnd - splitStart) / 2;
splitResults.add(splitMiddle);
break;
}
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]