This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4232dcb265285e15f6e87de2d78a9ba19324d64b Author: WJL3333 <[email protected]> AuthorDate: Mon Feb 8 12:54:13 2021 +0800 [Issue 9496] fix logic in ManagedLedgerWriter when config threadNum >= ledgerNum (#9497) Fix #9496 fix logic in ManagedLedgerWriter when config threadNum >= ledgerNum ### Modifications if threadNum >= ledgerNum. allocate ledger repeat among threads. origin logic may have thread without ledger and got an exception. ### Verifying this change build and run `./pulsar-perf managed-ledger -e 3 -w 2 -o 10000 --threads 20 -r 100000 -s 2048 -zk localhost:2181` no exception in stdlog ### Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? (not applicable) (cherry picked from commit 07f35098ef779da849d7da4b225a840220a51101) --- .../pulsar/testclient/ManagedLedgerWriter.java | 44 +++++++++++++++++++--- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java index 7cb3633..034f7f8 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java @@ -32,10 +32,7 @@ import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.DefaultThreadFactory; import java.text.DecimalFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -213,8 +210,7 @@ public class ManagedLedgerWriter { Collections.shuffle(managedLedgers); AtomicBoolean isDone = new AtomicBoolean(); - List<List<ManagedLedger>> managedLedgersPerThread = Lists.partition(managedLedgers, - Math.max(1, managedLedgers.size() / arguments.numThreads)); + Map<Integer, List<ManagedLedger>> managedLedgersPerThread = allocateToThreads(managedLedgers, arguments.numThreads); for (int i = 0; i < arguments.numThreads; i++) { List<ManagedLedger> managedLedgersForThisThread = managedLedgersPerThread.get(i); @@ -334,6 +330,42 @@ public class ManagedLedgerWriter { factory.shutdown(); } + + public static <T> Map<Integer,List<T>> allocateToThreads(List<T> managedLedgers, int numThreads) { + + Map<Integer,List<T>> map = new HashMap<>(); + + if (managedLedgers.size() >= numThreads) { + int threadIndex = 0; + for (T managedLedger : managedLedgers) { + + List<T> ledgerList = map.getOrDefault(threadIndex, new ArrayList<>()); + ledgerList.add(managedLedger); + map.put(threadIndex, ledgerList); + + threadIndex++; + if (threadIndex >= numThreads) { + threadIndex = threadIndex % numThreads; + } + } + + } else { + int ledgerIndex = 0; + for(int threadIndex = 0;threadIndex<numThreads;threadIndex++) { + List<T> ledgerList = map.getOrDefault(threadIndex,new ArrayList<>()); + ledgerList.add(managedLedgers.get(ledgerIndex)); + map.put(threadIndex,ledgerList); + + ledgerIndex++; + if(ledgerIndex >= managedLedgers.size()) { + ledgerIndex = ledgerIndex % managedLedgers.size(); + } + } + } + + return map; + } + private static void printAggregatedStats() { Histogram reportHistogram = cumulativeRecorder.getIntervalHistogram();
