This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4232dcb265285e15f6e87de2d78a9ba19324d64b
Author: WJL3333 <[email protected]>
AuthorDate: Mon Feb 8 12:54:13 2021 +0800

    [Issue 9496] fix logic in ManagedLedgerWriter when config threadNum >= 
ledgerNum (#9497)
    
    Fix #9496
    
    fix logic in ManagedLedgerWriter when config threadNum >= ledgerNum
    
    ### Modifications
    
    if threadNum >= ledgerNum.
    
    allocate ledger repeat among threads.
    
    origin logic may have thread without ledger and got an exception.
    
    ### Verifying this change
    
    build and run
    `./pulsar-perf managed-ledger -e 3 -w 2 -o 10000 --threads 20 -r 100000 -s 
2048 -zk localhost:2181`
    
    no exception in stdlog
    
    ### Documentation
    
      - Does this pull request introduce a new feature?   no
      - If yes, how is the feature documented? (not applicable)
    
    (cherry picked from commit 07f35098ef779da849d7da4b225a840220a51101)
---
 .../pulsar/testclient/ManagedLedgerWriter.java     | 44 +++++++++++++++++++---
 1 file changed, 38 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index 7cb3633..034f7f8 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -32,10 +32,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.text.DecimalFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -213,8 +210,7 @@ public class ManagedLedgerWriter {
         Collections.shuffle(managedLedgers);
         AtomicBoolean isDone = new AtomicBoolean();
 
-        List<List<ManagedLedger>> managedLedgersPerThread = 
Lists.partition(managedLedgers,
-                Math.max(1, managedLedgers.size() / arguments.numThreads));
+        Map<Integer, List<ManagedLedger>> managedLedgersPerThread = 
allocateToThreads(managedLedgers, arguments.numThreads);
 
         for (int i = 0; i < arguments.numThreads; i++) {
             List<ManagedLedger> managedLedgersForThisThread = 
managedLedgersPerThread.get(i);
@@ -334,6 +330,42 @@ public class ManagedLedgerWriter {
         factory.shutdown();
     }
 
+
+    public static <T> Map<Integer,List<T>> allocateToThreads(List<T> 
managedLedgers, int numThreads) {
+
+        Map<Integer,List<T>> map = new HashMap<>();
+
+        if (managedLedgers.size() >= numThreads) {
+            int threadIndex = 0;
+            for (T managedLedger : managedLedgers) {
+
+                List<T> ledgerList = map.getOrDefault(threadIndex, new 
ArrayList<>());
+                ledgerList.add(managedLedger);
+                map.put(threadIndex, ledgerList);
+
+                threadIndex++;
+                if (threadIndex >= numThreads) {
+                    threadIndex = threadIndex % numThreads;
+                }
+            }
+
+        } else {
+            int ledgerIndex = 0;
+            for(int threadIndex = 0;threadIndex<numThreads;threadIndex++) {
+                List<T> ledgerList = map.getOrDefault(threadIndex,new 
ArrayList<>());
+                ledgerList.add(managedLedgers.get(ledgerIndex));
+                map.put(threadIndex,ledgerList);
+
+                ledgerIndex++;
+                if(ledgerIndex >= managedLedgers.size()) {
+                    ledgerIndex = ledgerIndex % managedLedgers.size();
+                }
+            }
+        }
+
+        return map;
+    }
+
     private static void printAggregatedStats() {
         Histogram reportHistogram = cumulativeRecorder.getIntervalHistogram();
 

Reply via email to