This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 605868d  Improved efficiency in KeyShared dispatcher (#7104)
605868d is described below

commit 605868d64e018bc4fc075b4633423de0928e3d9a
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Jun 3 20:17:35 2020 -0700

    Improved efficiency in KeyShared dispatcher (#7104)
    
    Instead of grouping the messages by same key, group them directly by same 
consumer in order to achieve max possible grouping when sending them. Also 
reuse the map for messages grouping as a thread local.
---
 .../broker/service/AbstractBaseDispatcher.java     |   4 +
 ...ConsistentHashingStickyKeyConsumerSelector.java |   5 +-
 ...ashRangeAutoSplitStickyKeyConsumerSelector.java |   5 +-
 ...ashRangeExclusiveStickyKeyConsumerSelector.java |   2 +-
 .../broker/service/StickyKeyConsumerSelector.java  |   7 --
 ...istentStickyKeyDispatcherMultipleConsumers.java |  78 ++++++++-------
 .../PersistentDispatcherSingleActiveConsumer.java  |   5 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java | 111 ++++++++++-----------
 ...ntStickyKeyDispatcherMultipleConsumersTest.java |   5 +-
 9 files changed, 106 insertions(+), 116 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 7985f30..6272a2c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -75,6 +75,10 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
 
         for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
             Entry entry = entries.get(i);
+            if (entry == null) {
+                continue;
+            }
+
             ByteBuf metadataAndPayload = entry.getDataBuffer();
 
             MessageMetadata msgMetadata = 
Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
index f27c7f9..377edae 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
@@ -81,11 +81,8 @@ public class ConsistentHashingStickyKeyConsumerSelector 
implements StickyKeyCons
 
     @Override
     public Consumer select(byte[] stickyKey) {
-        return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
-    }
+        int hash = Murmur3_32Hash.getInstance().makeHash(stickyKey);
 
-    @Override
-    public Consumer select(int hash) {
         rwLock.readLock().lock();
         try {
             if (hashRing.isEmpty()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
index 5c3c5b5..18b07f6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
@@ -103,10 +103,7 @@ public class HashRangeAutoSplitStickyKeyConsumerSelector 
implements StickyKeyCon
 
     @Override
     public Consumer select(byte[] stickyKey) {
-        return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
-    }
-
-    public Consumer select(int hash) {
+        int hash = Murmur3_32Hash.getInstance().makeHash(stickyKey);
         if (rangeMap.size() > 0) {
             int slot = hash % rangeSize;
             return rangeMap.ceilingEntry(slot).getValue();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
index 21e94ba..dc96fbb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
@@ -63,7 +63,7 @@ public class HashRangeExclusiveStickyKeyConsumerSelector 
implements StickyKeyCon
         return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
     }
 
-    public Consumer select(int hash) {
+    Consumer select(int hash) {
         if (rangeMap.size() > 0) {
             int slot = hash % rangeSize;
             Map.Entry<Integer, Consumer> ceilingEntry = 
rangeMap.ceilingEntry(slot);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
index 88852b5..1b168d5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
@@ -43,11 +43,4 @@ public interface StickyKeyConsumerSelector {
      * @return consumer
      */
     Consumer select(byte[] stickyKey);
-
-    /**
-     * Select a consumer by hash of the sticky they
-     * @param keyHash hash of sticky key
-     * @return
-     */
-    Consumer select(int keyHash);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index b01a432..32cce87 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,6 +18,13 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
+import io.netty.util.concurrent.FastThreadLocal;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Consumer;
@@ -25,15 +32,7 @@ import org.apache.pulsar.broker.service.EntryBatchSizes;
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
-import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 
 public class NonPersistentStickyKeyDispatcherMultipleConsumers extends 
NonPersistentDispatcherMultipleConsumers {
 
@@ -62,37 +61,42 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
         return SubType.Key_Shared;
     }
 
+    private static final FastThreadLocal<Map<Consumer, List<Entry>>> 
localGroupedEntries = new FastThreadLocal<Map<Consumer, List<Entry>>>() {
+        @Override
+        protected Map<Consumer, List<Entry>> initialValue() throws Exception {
+            return new HashMap<>();
+        }
+    };
+
     @Override
     public void sendMessages(List<Entry> entries) {
-        if (entries.size() > 0) {
-            final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
-            for (Entry entry : entries) {
-                int key = 
Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
-                groupedEntries.putIfAbsent(key, new ArrayList<>());
-                groupedEntries.get(key).add(entry);
-            }
-            final Iterator<Map.Entry<Integer, List<Entry>>> iterator = 
groupedEntries.entrySet().iterator();
-            while (iterator.hasNext()) {
-                final Map.Entry<Integer, List<Entry>> entriesWithSameKey = 
iterator.next();
-                //TODO: None key policy
-                Consumer consumer = 
selector.select(entriesWithSameKey.getKey());
-                if (consumer != null) {
-                    SendMessageInfo sendMessageInfo = 
SendMessageInfo.getThreadLocal();
-                    EntryBatchSizes batchSizes = 
EntryBatchSizes.get(entriesWithSameKey.getValue().size());
-                    filterEntriesForConsumer(entriesWithSameKey.getValue(), 
batchSizes, sendMessageInfo, null, null);
-                    consumer.sendMessages(entriesWithSameKey.getValue(), 
batchSizes, null, sendMessageInfo.getTotalMessages(),
-                            sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
-                    TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, 
-sendMessageInfo.getTotalMessages());
-                } else {
-                    entries.forEach(entry -> {
-                        int totalMsgs = 
Commands.getNumberOfMessagesInBatch(entry.getDataBuffer(), 
subscription.toString(), -1);
-                        if (totalMsgs > 0) {
-                            msgDrop.recordEvent(totalMsgs);
-                        }
-                        entry.release();
-                    });
-                }
-            }
+        if (!entries.isEmpty()) {
+            return;
+        }
+
+        if (consumerSet.isEmpty()) {
+            entries.forEach(Entry::release);
+            return;
+        }
+
+        final Map<Consumer, List<Entry>> groupedEntries = 
localGroupedEntries.get();
+        groupedEntries.clear();
+
+        for (Entry entry : entries) {
+            Consumer consumer = 
selector.select(peekStickyKey(entry.getDataBuffer()));
+            groupedEntries.computeIfAbsent(consumer, k -> new 
ArrayList<>()).add(entry);
+        }
+
+        for (Map.Entry<Consumer, List<Entry>> entriesByConsumer : 
groupedEntries.entrySet()) {
+            Consumer consumer = entriesByConsumer.getKey();
+            List<Entry> entriesForConsumer = entriesByConsumer.getValue();
+
+            SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
+            EntryBatchSizes batchSizes = 
EntryBatchSizes.get(entriesForConsumer.size());
+            filterEntriesForConsumer(entriesForConsumer, batchSizes, 
sendMessageInfo, null, null);
+            consumer.sendMessages(entriesForConsumer, batchSizes, null, 
sendMessageInfo.getTotalMessages(),
+                    sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
+            TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, 
-sendMessageInfo.getTotalMessages());
         }
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 263bd7f..5bb7629 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -54,7 +54,6 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.Codec;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -215,8 +214,8 @@ public final class PersistentDispatcherSingleActiveConsumer 
extends AbstractDisp
             Iterator<Entry> iterator = entries.iterator();
             while (iterator.hasNext()) {
                 Entry entry = iterator.next();
-                int keyHash = 
Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
-                Consumer consumer = stickyKeyConsumerSelector.select(keyHash);
+                byte[] key = peekStickyKey(entry.getDataBuffer());
+                Consumer consumer = stickyKeyConsumerSelector.select(key);
                 if (consumer == null || currentConsumer != consumer) {
                     iterator.remove();
                 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 6079d31..ce946fc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,9 +18,10 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import io.netty.util.concurrent.FastThreadLocal;
+
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -37,7 +38,6 @@ import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,59 +63,79 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
         selector.removeConsumer(consumer);
     }
 
+    private static final FastThreadLocal<Map<Consumer, List<Entry>>> 
localGroupedEntries = new FastThreadLocal<Map<Consumer, List<Entry>>>() {
+        @Override
+        protected Map<Consumer, List<Entry>> initialValue() throws Exception {
+            return new HashMap<>();
+        }
+    };
+
     @Override
     protected void sendMessagesToConsumers(ReadType readType, List<Entry> 
entries) {
         long totalMessagesSent = 0;
         long totalBytesSent = 0;
+        int entriesCount = entries.size();
+
         // Trigger read more messages
-        if (entries.size() == 0) {
+        if (entriesCount == 0) {
             readMoreEntries();
             return;
         }
-        final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
-        for (Entry entry : entries) {
-            int key = 
Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
-            groupedEntries.putIfAbsent(key, new ArrayList<>());
-            groupedEntries.get(key).add(entry);
+
+        if (consumerSet.isEmpty()) {
+            entries.forEach(Entry::release);
+            cursor.rewind();
+            return;
         }
-        final Iterator<Map.Entry<Integer, List<Entry>>> iterator = 
groupedEntries.entrySet().iterator();
+
+        final Map<Consumer, List<Entry>> groupedEntries = 
localGroupedEntries.get();
+        groupedEntries.clear();
+
+        for (int i = 0; i < entriesCount; i++) {
+            Entry entry = entries.get(i);
+            Consumer c = selector.select(peekStickyKey(entry.getDataBuffer()));
+            groupedEntries.computeIfAbsent(c, k -> new 
ArrayList<>()).add(entry);
+        }
+
         AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
-        while (iterator.hasNext() && totalAvailablePermits > 0 && 
isAtleastOneConsumerAvailable()) {
-            final Map.Entry<Integer, List<Entry>> entriesWithSameKey = 
iterator.next();
-            //TODO: None key policy
-            Consumer consumer = selector.select(entriesWithSameKey.getKey());
-            if (consumer == null) {
-                // Do nothing, cursor will be rewind at reconnection
-                log.info("[{}] rewind because no available consumer found for 
key {} from total {}", name,
-                        entriesWithSameKey.getKey(), consumerList.size());
-                entriesWithSameKey.getValue().forEach(Entry::release);
-                cursor.rewind();
-                return;
-            }
 
-            int availablePermits = consumer.isWritable() ? 
consumer.getAvailablePermits() : 1;
-            if (log.isDebugEnabled() && !consumer.isWritable()) {
-                log.debug("[{}-{}] consumer is not writable. dispatching only 
1 message to {} ", topic.getName(), name,
-                        consumer);
-            }
-            int messagesForC = Math.min(entriesWithSameKey.getValue().size(), 
availablePermits);
+        for (Map.Entry<Consumer, List<Entry>> current : 
groupedEntries.entrySet()) {
+            Consumer consumer = current.getKey();
+            List<Entry> entriesWithSameKey = current.getValue();
+            int entriesWithSameKeyCount = entriesWithSameKey.size();
+
+            int messagesForC = Math.min(entriesWithSameKeyCount, 
consumer.getAvailablePermits());
             if (log.isDebugEnabled()) {
-                log.debug("[{}] select consumer {} for key {} with messages 
num {}, read type is {}",
-                        name, consumer.consumerName(), 
entriesWithSameKey.getKey(), messagesForC, readType);
+                log.debug("[{}] select consumer {} with messages num {}, read 
type is {}",
+                        name, consumer.consumerName(), messagesForC, readType);
+            }
+
+            if (messagesForC < entriesWithSameKeyCount) {
+                // We are not able to push all the messages with given key to 
its consumer,
+                // so we discard for now and mark them for later redelivery
+                for (int i = messagesForC; i < entriesWithSameKeyCount; i++) {
+                    Entry entry = entriesWithSameKey.get(i);
+                    messagesToRedeliver.add(entry.getLedgerId(), 
entry.getEntryId());
+                    entry.release();
+                    entriesWithSameKey.set(i, null);
+                }
             }
+
             if (messagesForC > 0) {
                 // remove positions first from replay list first : 
sendMessages recycles entries
-                List<Entry> subList = new 
ArrayList<>(entriesWithSameKey.getValue().subList(0, messagesForC));
                 if (readType == ReadType.Replay) {
-                    subList.forEach(entry -> 
messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId()));
+                    for (int i = 0; i < messagesForC; i++) {
+                        Entry entry = entriesWithSameKey.get(i);
+                        messagesToRedeliver.remove(entry.getLedgerId(), 
entry.getEntryId());
+                    }
                 }
 
                 SendMessageInfo sendMessageInfo = 
SendMessageInfo.getThreadLocal();
-                EntryBatchSizes batchSizes = 
EntryBatchSizes.get(subList.size());
+                EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC);
                 EntryBatchIndexesAcks batchIndexesAcks = 
EntryBatchIndexesAcks.get();
-                filterEntriesForConsumer(subList, batchSizes, sendMessageInfo, 
batchIndexesAcks, cursor);
+                filterEntriesForConsumer(entriesWithSameKey, batchSizes, 
sendMessageInfo, batchIndexesAcks, cursor);
 
-                consumer.sendMessages(subList, batchSizes, batchIndexesAcks, 
sendMessageInfo.getTotalMessages(),
+                consumer.sendMessages(entriesWithSameKey, batchSizes, 
batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(),
                         getRedeliveryTracker()).addListener(future -> {
                             if (future.isSuccess() && 
keyNumbers.decrementAndGet() == 0) {
@@ -123,17 +143,9 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                             }
                         });
 
-                for (int i = 0; i < messagesForC; i++) {
-                    entriesWithSameKey.getValue().remove(0);
-                }
-
                 TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, 
-(sendMessageInfo.getTotalMessages() - 
batchIndexesAcks.getTotalAckedIndexCount()));
                 totalMessagesSent += sendMessageInfo.getTotalMessages();
                 totalBytesSent += sendMessageInfo.getTotalBytes();
-
-                if (entriesWithSameKey.getValue().size() == 0) {
-                    iterator.remove();
-                }
             }
         }
 
@@ -147,21 +159,6 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                 dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, 
totalBytesSent);
             }
         }
-
-        if (groupedEntries.size() > 0) {
-            int laterReplay = 0;
-            for (List<Entry> entryList : groupedEntries.values()) {
-                laterReplay += entryList.size();
-                entryList.forEach(entry -> {
-                    messagesToRedeliver.add(entry.getLedgerId(), 
entry.getEntryId());
-                    entry.release();
-                });
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] No consumers found with available permits, 
storing {} positions for later replay", name,
-                        laterReplay);
-            }
-        }
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 99822e9..29b807a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -141,7 +141,7 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         }
 
         ArgumentCaptor<Integer> totalMessagesCaptor = 
ArgumentCaptor.forClass(Integer.class);
-        verify(consumerMock, times(2)).sendMessages(
+        verify(consumerMock, times(1)).sendMessages(
                 anyList(),
                 any(EntryBatchSizes.class),
                 any(EntryBatchIndexesAcks.class),
@@ -152,8 +152,7 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         );
 
         List<Integer> allTotalMessagesCaptor = 
totalMessagesCaptor.getAllValues();
-        Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 0);
-        Assert.assertEquals(allTotalMessagesCaptor.get(1).intValue(), 5);
+        Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
     }
 
     private ByteBuf createMessage(String message, int sequenceId) {

Reply via email to