This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dde417cbc2d101478a8159dfbd8db59833cd9b55
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Jan 26 16:05:16 2022 +0800

    Fix batch message ack does not decrease the unacked-msg count. (#13383)
    
    (cherry picked from commit d64f2916e0feee1b09459b4db6094db1143067a1)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 129 ++++++++++++++++++---
 .../pulsar/broker/service/BatchMessageTest.java    |   2 +-
 .../BatchMessageWithBatchIndexLevelTest.java       | 108 +++++++++++++++++
 .../PersistentDispatcherFailoverConsumerTest.java  |  79 ++++++-------
 4 files changed, 265 insertions(+), 53 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 4f886d3..cf9f5bf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.Promise;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -56,6 +57,7 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.stats.Rate;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import 
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -126,6 +128,7 @@ public class Consumer {
     private PositionImpl readPositionWhenJoining;
     private final String clientAddress; // IP address only, no port number 
included
     private final MessageId startMessageId;
+    private final boolean isAcknowledgmentAtBatchIndexLevelEnabled;
 
     public Consumer(Subscription subscription, SubType subType, String 
topicName, long consumerId,
                     int priorityLevel, String consumerName,
@@ -182,6 +185,8 @@ public class Consumer {
         }
 
         this.clientAddress = cnx.clientSourceAddress();
+        this.isAcknowledgmentAtBatchIndexLevelEnabled = 
subscription.getTopic().getBrokerService()
+                
.getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled();
     }
 
     public SubType subType() {
@@ -233,7 +238,7 @@ public class Consumer {
             writePromise.setSuccess(null);
             return writePromise;
         }
-
+        int unackedMessages = totalMessages;
         // Note
         // Must ensure that the message is written to the pendingAcks before 
sent is first , because this consumer
         // is possible to disconnect at this time.
@@ -243,11 +248,16 @@ public class Consumer {
                 if (entry != null) {
                     int batchSize = batchSizes.getBatchSize(i);
                     int stickyKeyHash = getStickyKeyHash(entry);
+                    long[] ackSet = 
getCursorAckSet(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
+                    if (ackSet != null) {
+                        unackedMessages -= (batchSize - 
BitSet.valueOf(ackSet).cardinality());
+                    }
                     pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), 
batchSize, stickyKeyHash);
-                    if (log.isDebugEnabled()){
+                    if (log.isDebugEnabled()) {
                         log.debug("[{}-{}] Added {}:{} ledger entry with 
batchSize of {} to pendingAcks in"
                                         + " broker.service.Consumer for 
consumerId: {}",
-                             topicName, subscription, entry.getLedgerId(), 
entry.getEntryId(), batchSize, consumerId);
+                                topicName, subscription, entry.getLedgerId(), 
entry.getEntryId(), batchSize,
+                                consumerId);
                     }
                 }
             }
@@ -267,7 +277,7 @@ public class Consumer {
                             + " for consumerId: {}; avgMessagesPerEntry is {}",
                    topicName, subscription, ackedCount, totalMessages, 
consumerId, tmpAvgMessagesPerEntry);
         }
-        incrementUnackedMessages(totalMessages);
+        incrementUnackedMessages(unackedMessages);
         msgOut.recordMultipleEvents(totalMessages, totalBytes);
         msgOutCounter.add(totalMessages);
         bytesOutCounter.add(totalBytes);
@@ -381,16 +391,19 @@ public class Consumer {
     //this method is for individual ack not carry the transaction
     private CompletableFuture<Void> individualAckNormal(CommandAck ack, 
Map<String, Long> properties) {
         List<Position> positionsAcked = new ArrayList<>();
-
         for (int i = 0; i < ack.getMessageIdsCount(); i++) {
             MessageIdData msgId = ack.getMessageIdAt(i);
             PositionImpl position;
+            long ackedCount = 0;
+            long batchSize = getBatchSize(msgId);
+            Consumer ackOwnerConsumer = 
getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
             if (msgId.getAckSetsCount() > 0) {
                 long[] ackSets = new long[msgId.getAckSetsCount()];
                 for (int j = 0; j < msgId.getAckSetsCount(); j++) {
                     ackSets[j] = msgId.getAckSetAt(j);
                 }
                 position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId(), ackSets);
+                ackedCount = getAckedCountForBatchIndexLevelEnabled(position, 
batchSize, ackSets);
                 if (isTransactionEnabled()) {
                     //sync the batch position bit set point, in order to 
delete the position in pending acks
                     if (Subscription.isIndividualAckMode(subType)) {
@@ -400,7 +413,20 @@ public class Consumer {
                 }
             } else {
                 position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId());
+                if (isAcknowledgmentAtBatchIndexLevelEnabled) {
+                    long[] cursorAckSet = getCursorAckSet(position);
+                    if (cursorAckSet != null) {
+                        ackedCount = batchSize - 
BitSet.valueOf(cursorAckSet).cardinality();
+                    } else {
+                        ackedCount = batchSize;
+                    }
+                } else {
+                    ackedCount = batchSize;
+                }
             }
+
+            addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
+
             positionsAcked.add(position);
 
             checkCanRemovePendingAcksAndHandle(position, msgId);
@@ -430,7 +456,6 @@ public class Consumer {
     private CompletableFuture<Void> individualAckWithTransaction(CommandAck 
ack) {
         // Individual ack
         List<MutablePair<PositionImpl, Integer>> positionsAcked = new 
ArrayList<>();
-
         if (!isTransactionEnabled()) {
             return FutureUtil.failedFuture(
                     new BrokerServiceException.NotAllowedException("Server 
don't support transaction ack!"));
@@ -476,6 +501,56 @@ public class Consumer {
         return completableFuture;
     }
 
+    private long getBatchSize(MessageIdData msgId) {
+        long batchSize = 1;
+        if (Subscription.isIndividualAckMode(subType)) {
+            LongPair longPair = pendingAcks.get(msgId.getLedgerId(), 
msgId.getEntryId());
+            // Consumer may ack the msg that not belongs to it.
+            if (longPair == null) {
+                Consumer ackOwnerConsumer = 
getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
+                longPair = 
ackOwnerConsumer.getPendingAcks().get(msgId.getLedgerId(), msgId.getEntryId());
+                if (longPair != null) {
+                    batchSize = longPair.first;
+                }
+            } else {
+                batchSize = longPair.first;
+            }
+        }
+        return batchSize;
+    }
+
+    private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, 
long batchSize, long[] ackSets) {
+        long ackedCount = 0;
+        if (isAcknowledgmentAtBatchIndexLevelEnabled) {
+            long[] cursorAckSet = getCursorAckSet(position);
+            if (cursorAckSet != null) {
+                BitSetRecyclable cursorBitSet = 
BitSetRecyclable.create().resetWords(cursorAckSet);
+                int lastCardinality = cursorBitSet.cardinality();
+                BitSetRecyclable givenBitSet = 
BitSetRecyclable.create().resetWords(ackSets);
+                cursorBitSet.and(givenBitSet);
+                givenBitSet.recycle();
+                int currentCardinality = cursorBitSet.cardinality();
+                ackedCount = lastCardinality - currentCardinality;
+                cursorBitSet.recycle();
+            } else if (pendingAcks.get(position.getLedgerId(), 
position.getEntryId()) != null) {
+                ackedCount = batchSize - BitSet.valueOf(ackSets).cardinality();
+            }
+        }
+        return ackedCount;
+    }
+
+    private long getUnAckedCountForBatchIndexLevelEnabled(PositionImpl 
position, long batchSize) {
+        long unAckedCount = batchSize;
+        if (isAcknowledgmentAtBatchIndexLevelEnabled) {
+            long[] cursorAckSet = getCursorAckSet(position);
+            if (cursorAckSet != null) {
+                BitSetRecyclable cursorBitSet = 
BitSetRecyclable.create().resetWords(cursorAckSet);
+                unAckedCount = cursorBitSet.cardinality();
+            }
+        }
+        return unAckedCount;
+    }
+
     private void checkAckValidationError(CommandAck ack, PositionImpl 
position) {
         if (ack.hasValidationError()) {
             log.error("[{}] [{}] Received ack for corrupted message at {} - 
Reason: {}", subscription,
@@ -489,6 +564,26 @@ public class Consumer {
         }
     }
 
+    private Consumer getAckOwnerConsumer(long ledgerId, long entryId) {
+        Consumer ackOwnerConsumer = this;
+        if (Subscription.isIndividualAckMode(subType)) {
+            for (Consumer consumer : subscription.getConsumers()) {
+                if (consumer != this && 
consumer.getPendingAcks().containsKey(ledgerId, entryId)) {
+                    ackOwnerConsumer = consumer;
+                    break;
+                }
+            }
+        }
+        return ackOwnerConsumer;
+    }
+
+    private long[] getCursorAckSet(PositionImpl position) {
+        if (!(subscription instanceof PersistentSubscription)) {
+            return null;
+        }
+        return (((PersistentSubscription) 
subscription).getCursor()).getDeletedBatchIndexesAsLongArray(position);
+    }
+
     private boolean isTransactionEnabled() {
         return subscription instanceof PersistentSubscription
                 && ((PersistentTopic) subscription.getTopic())
@@ -711,7 +806,6 @@ public class Consumer {
                 ? 
ackOwnedConsumer.getPendingAcks().get(position.getLedgerId(), 
position.getEntryId())
                 : null;
         if (ackedPosition != null) {
-            int totalAckedMsgs = (int) ackedPosition.first;
             if 
(!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), 
position.getEntryId())) {
                 // Message was already removed by the other consumer
                 return;
@@ -721,7 +815,7 @@ public class Consumer {
             }
             // unblock consumer-throttling when limit check is disabled or 
receives half of maxUnackedMessages =>
             // consumer can start again consuming messages
-            int unAckedMsgs = addAndGetUnAckedMsgs(ackOwnedConsumer, 
-totalAckedMsgs);
+            int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
             if ((((unAckedMsgs <= maxUnackedMessages / 2) && 
ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
                     && ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs())
                     || !shouldBlockConsumerOnUnackMsgs()) {
@@ -751,7 +845,9 @@ public class Consumer {
             List<PositionImpl> pendingPositions = new ArrayList<>((int) 
pendingAcks.size());
             MutableInt totalRedeliveryMessages = new MutableInt(0);
             pendingAcks.forEach((ledgerId, entryId, batchSize, stickyKeyHash) 
-> {
-                totalRedeliveryMessages.add((int) batchSize);
+                int unAckedCount = (int) 
getUnAckedCountForBatchIndexLevelEnabled(PositionImpl.get(ledgerId, entryId),
+                        batchSize);
+                totalRedeliveryMessages.add(unAckedCount);
                 pendingPositions.add(new PositionImpl(ledgerId, entryId));
             });
 
@@ -775,9 +871,9 @@ public class Consumer {
             PositionImpl position = PositionImpl.get(msg.getLedgerId(), 
msg.getEntryId());
             LongPair longPair = pendingAcks.get(position.getLedgerId(), 
position.getEntryId());
             if (longPair != null) {
-                long batchSize = longPair.first;
+                int unAckedCount = (int) 
getUnAckedCountForBatchIndexLevelEnabled(position, longPair.first);
                 pendingAcks.remove(position.getLedgerId(), 
position.getEntryId());
-                totalRedeliveryMessages += batchSize;
+                totalRedeliveryMessages += unAckedCount;
                 pendingPositions.add(position);
             }
         }
@@ -811,8 +907,15 @@ public class Consumer {
     }
 
     private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) {
-        subscription.addUnAckedMessages(ackedMessages);
-        return UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages);
+        int unackedMsgs = 0;
+        if (Subscription.isIndividualAckMode(subType)) {
+            subscription.addUnAckedMessages(ackedMessages);
+            unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, 
ackedMessages);
+        }
+        if (unackedMsgs < 0) {
+            log.error("unackedMsgs is : {}, ackedMessages : {}, consumer : 
{}", unackedMsgs, ackedMessages, consumer);
+        }
+        return unackedMsgs;
     }
 
     private void clearUnAckedMsgs() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 1bf27ac..c77d2bf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -668,7 +668,7 @@ public class BatchMessageTest extends BrokerTestBase {
      *
      * @throws Exception
      */
-    @Test(dataProvider = "containerBuilder", timeOut = 3000)
+    @Test(dataProvider = "containerBuilder")
     public void testConcurrentBatchMessageAck(BatcherBuilder builder) throws 
Exception {
         int numMsgs = 10;
         final String topicName = "persistent://prop/ns-abc/testConcurrentAck-" 
+ UUID.randomUUID();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
new file mode 100644
index 0000000..20ba4f1
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.collect.Lists;
+import lombok.SneakyThrows;
+import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import static org.testng.Assert.assertEquals;
+
+@Test(groups = "broker")
+public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
+        super.baseSetup();
+    }
+
+    @Test
+    @SneakyThrows
+    public void testBatchMessageAck() {
+        int numMsgs = 40;
+        final String topicName = "persistent://prop/ns-abc/batchMessageAck-" + 
UUID.randomUUID();
+        final String subscriptionName = "sub-batch-1";
+
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient
+                .newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .receiverQueueSize(10)
+                .subscriptionType(SubscriptionType.Shared)
+                .enableBatchIndexAcknowledgment(true)
+                .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .topic(topicName)
+                .batchingMaxMessages(20)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .enableBatching(true)
+                .create();
+
+        List<CompletableFuture<MessageId>> sendFutureList = 
Lists.newArrayList();
+        for (int i = 0; i < numMsgs; i++) {
+            byte[] message = ("batch-message-" + i).getBytes();
+            
sendFutureList.add(producer.newMessage().value(message).sendAsync());
+        }
+        FutureUtil.waitForAll(sendFutureList).get();
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        PersistentDispatcherMultipleConsumers dispatcher = 
(PersistentDispatcherMultipleConsumers) topic
+                .getSubscription(subscriptionName).getDispatcher();
+        Message<byte[]> receive1 = consumer.receive();
+        Message<byte[]> receive2 = consumer.receive();
+        consumer.acknowledge(receive1);
+        consumer.acknowledge(receive2);
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 18);
+        });
+        Message<byte[]> receive3 = consumer.receive();
+        Message<byte[]> receive4 = consumer.receive();
+        consumer.acknowledge(receive3);
+        consumer.acknowledge(receive4);
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
+        });
+        Message<byte[]> receive5 = consumer.receive();
+        consumer.negativeAcknowledge(receive5);
+        Awaitility.await().pollInterval(1, 
TimeUnit.MILLISECONDS).untilAsserted(() -> {
+            
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0);
+        });
+        consumer.receive();
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
+        });
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index d4aae1a..4b947aa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -515,15 +515,15 @@ public class PersistentDispatcherFailoverConsumerTest {
 
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         PersistentDispatcherMultipleConsumers dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
-        Consumer consumer1 = createConsumer(0, 2, false, 1);
-        Consumer consumer2 = createConsumer(0, 2, false, 2);
-        Consumer consumer3 = createConsumer(0, 2, false, 3);
-        Consumer consumer4 = createConsumer(1, 2, false, 4);
-        Consumer consumer5 = createConsumer(1, 1, false, 5);
-        Consumer consumer6 = createConsumer(1, 2, false, 6);
-        Consumer consumer7 = createConsumer(2, 1, false, 7);
-        Consumer consumer8 = createConsumer(2, 1, false, 8);
-        Consumer consumer9 = createConsumer(2, 1, false, 9);
+        Consumer consumer1 = createConsumer(topic, 0, 2, false, 1);
+        Consumer consumer2 = createConsumer(topic, 0, 2, false, 2);
+        Consumer consumer3 = createConsumer(topic, 0, 2, false, 3);
+        Consumer consumer4 = createConsumer(topic, 1, 2, false, 4);
+        Consumer consumer5 = createConsumer(topic, 1, 1, false, 5);
+        Consumer consumer6 = createConsumer(topic, 1, 2, false, 6);
+        Consumer consumer7 = createConsumer(topic, 2, 1, false, 7);
+        Consumer consumer8 = createConsumer(topic, 2, 1, false, 8);
+        Consumer consumer9 = createConsumer(topic, 2, 1, false, 9);
         dispatcher.addConsumer(consumer1);
         dispatcher.addConsumer(consumer2);
         dispatcher.addConsumer(consumer3);
@@ -547,7 +547,7 @@ public class PersistentDispatcherFailoverConsumerTest {
         Assert.assertEquals(getNextConsumer(dispatcher), consumer7);
         Assert.assertEquals(getNextConsumer(dispatcher), consumer8);
         // in between add upper priority consumer with more permits
-        Consumer consumer10 = createConsumer(0, 2, false, 10);
+        Consumer consumer10 = createConsumer(topic, 0, 2, false, 10);
         dispatcher.addConsumer(consumer10);
         Assert.assertEquals(getNextConsumer(dispatcher), consumer10);
         Assert.assertEquals(getNextConsumer(dispatcher), consumer10);
@@ -559,12 +559,12 @@ public class PersistentDispatcherFailoverConsumerTest {
     public void testFewBlockedConsumerSamePriority() throws Exception{
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         PersistentDispatcherMultipleConsumers dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
-        Consumer consumer1 = createConsumer(0, 2, false, 1);
-        Consumer consumer2 = createConsumer(0, 2, false, 2);
-        Consumer consumer3 = createConsumer(0, 2, false, 3);
-        Consumer consumer4 = createConsumer(0, 2, false, 4);
-        Consumer consumer5 = createConsumer(0, 1, true, 5);
-        Consumer consumer6 = createConsumer(0, 2, true, 6);
+        Consumer consumer1 = createConsumer(topic, 0, 2, false, 1);
+        Consumer consumer2 = createConsumer(topic, 0, 2, false, 2);
+        Consumer consumer3 = createConsumer(topic, 0, 2, false, 3);
+        Consumer consumer4 = createConsumer(topic, 0, 2, false, 4);
+        Consumer consumer5 = createConsumer(topic, 0, 1, true, 5);
+        Consumer consumer6 = createConsumer(topic, 0, 2, true, 6);
         dispatcher.addConsumer(consumer1);
         dispatcher.addConsumer(consumer2);
         dispatcher.addConsumer(consumer3);
@@ -586,18 +586,18 @@ public class PersistentDispatcherFailoverConsumerTest {
     public void testFewBlockedConsumerDifferentPriority() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         PersistentDispatcherMultipleConsumers dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
-        Consumer consumer1 = createConsumer(0, 2, false, 1);
-        Consumer consumer2 = createConsumer(0, 2, false, 2);
-        Consumer consumer3 = createConsumer(0, 2, false, 3);
-        Consumer consumer4 = createConsumer(0, 2, false, 4);
-        Consumer consumer5 = createConsumer(0, 1, true, 5);
-        Consumer consumer6 = createConsumer(0, 2, true, 6);
-        Consumer consumer7 = createConsumer(1, 2, false, 7);
-        Consumer consumer8 = createConsumer(1, 10, true, 8);
-        Consumer consumer9 = createConsumer(1, 2, false, 9);
-        Consumer consumer10 = createConsumer(2, 2, false, 10);
-        Consumer consumer11 = createConsumer(2, 10, true, 11);
-        Consumer consumer12 = createConsumer(2, 2, false, 12);
+        Consumer consumer1 = createConsumer(topic, 0, 2, false, 1);
+        Consumer consumer2 = createConsumer(topic, 0, 2, false, 2);
+        Consumer consumer3 = createConsumer(topic, 0, 2, false, 3);
+        Consumer consumer4 = createConsumer(topic, 0, 2, false, 4);
+        Consumer consumer5 = createConsumer(topic, 0, 1, true, 5);
+        Consumer consumer6 = createConsumer(topic, 0, 2, true, 6);
+        Consumer consumer7 = createConsumer(topic, 1, 2, false, 7);
+        Consumer consumer8 = createConsumer(topic, 1, 10, true, 8);
+        Consumer consumer9 = createConsumer(topic, 1, 2, false, 9);
+        Consumer consumer10 = createConsumer(topic, 2, 2, false, 10);
+        Consumer consumer11 = createConsumer(topic, 2, 10, true, 11);
+        Consumer consumer12 = createConsumer(topic, 2, 2, false, 12);
         dispatcher.addConsumer(consumer1);
         dispatcher.addConsumer(consumer2);
         dispatcher.addConsumer(consumer3);
@@ -625,8 +625,8 @@ public class PersistentDispatcherFailoverConsumerTest {
         Assert.assertEquals(getNextConsumer(dispatcher), consumer10);
         Assert.assertEquals(getNextConsumer(dispatcher), consumer12);
         // add consumer with lower priority again
-        Consumer consumer13 = createConsumer(0, 2, false, 13);
-        Consumer consumer14 = createConsumer(0, 2, true, 14);
+        Consumer consumer13 = createConsumer(topic, 0, 2, false, 13);
+        Consumer consumer14 = createConsumer(topic, 0, 2, true, 14);
         dispatcher.addConsumer(consumer13);
         dispatcher.addConsumer(consumer14);
         Assert.assertEquals(getNextConsumer(dispatcher), consumer13);
@@ -640,13 +640,13 @@ public class PersistentDispatcherFailoverConsumerTest {
     public void testFewBlockedConsumerDifferentPriority2() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         PersistentDispatcherMultipleConsumers dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
-        Consumer consumer1 = createConsumer(0, 2, true, 1);
-        Consumer consumer2 = createConsumer(0, 2, true, 2);
-        Consumer consumer3 = createConsumer(0, 2, true, 3);
-        Consumer consumer4 = createConsumer(1, 2, false, 4);
-        Consumer consumer5 = createConsumer(1, 1, false, 5);
-        Consumer consumer6 = createConsumer(2, 1, false, 6);
-        Consumer consumer7 = createConsumer(2, 2, true, 7);
+        Consumer consumer1 = createConsumer(topic, 0, 2, true, 1);
+        Consumer consumer2 = createConsumer(topic, 0, 2, true, 2);
+        Consumer consumer3 = createConsumer(topic, 0, 2, true, 3);
+        Consumer consumer4 = createConsumer(topic, 1, 2, false, 4);
+        Consumer consumer5 = createConsumer(topic, 1, 1, false, 5);
+        Consumer consumer6 = createConsumer(topic, 2, 1, false, 6);
+        Consumer consumer7 = createConsumer(topic, 2, 2, true, 7);
         dispatcher.addConsumer(consumer1);
         dispatcher.addConsumer(consumer2);
         dispatcher.addConsumer(consumer3);
@@ -676,9 +676,10 @@ public class PersistentDispatcherFailoverConsumerTest {
         return null;
     }
 
-    private Consumer createConsumer(int priority, int permit, boolean blocked, 
int id) throws Exception {
+    private Consumer createConsumer(PersistentTopic topic, int priority, int 
permit, boolean blocked, int id) throws Exception {
+        PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock, false);
         Consumer consumer =
-                new Consumer(null, SubType.Shared, "test-topic", id, priority, 
""+id, 5000,
+                new Consumer(sub, SubType.Shared, "test-topic", id, priority, 
""+id, 5000,
                         serverCnx, "appId", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, MessageId.latest);
         try {
             consumer.flowPermits(permit);

Reply via email to