This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit dde417cbc2d101478a8159dfbd8db59833cd9b55 Author: Jiwei Guo <[email protected]> AuthorDate: Wed Jan 26 16:05:16 2022 +0800 Fix batch message ack does not decrease the unacked-msg count. (#13383) (cherry picked from commit d64f2916e0feee1b09459b4db6094db1143067a1) --- .../org/apache/pulsar/broker/service/Consumer.java | 129 ++++++++++++++++++--- .../pulsar/broker/service/BatchMessageTest.java | 2 +- .../BatchMessageWithBatchIndexLevelTest.java | 108 +++++++++++++++++ .../PersistentDispatcherFailoverConsumerTest.java | 79 ++++++------- 4 files changed, 265 insertions(+), 53 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 4f886d3..cf9f5bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collections; import java.util.List; import java.util.Map; @@ -56,6 +57,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.transaction.common.exception.TransactionConflictException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,6 +128,7 @@ public class Consumer { private PositionImpl readPositionWhenJoining; private final String clientAddress; // IP address only, no port number included private final MessageId startMessageId; + private final boolean isAcknowledgmentAtBatchIndexLevelEnabled; public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, @@ -182,6 +185,8 @@ public class Consumer { } this.clientAddress = cnx.clientSourceAddress(); + this.isAcknowledgmentAtBatchIndexLevelEnabled = subscription.getTopic().getBrokerService() + .getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled(); } public SubType subType() { @@ -233,7 +238,7 @@ public class Consumer { writePromise.setSuccess(null); return writePromise; } - + int unackedMessages = totalMessages; // Note // Must ensure that the message is written to the pendingAcks before sent is first , because this consumer // is possible to disconnect at this time. @@ -243,11 +248,16 @@ public class Consumer { if (entry != null) { int batchSize = batchSizes.getBatchSize(i); int stickyKeyHash = getStickyKeyHash(entry); + long[] ackSet = getCursorAckSet(PositionImpl.get(entry.getLedgerId(), entry.getEntryId())); + if (ackSet != null) { + unackedMessages -= (batchSize - BitSet.valueOf(ackSet).cardinality()); + } pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, stickyKeyHash); - if (log.isDebugEnabled()){ + if (log.isDebugEnabled()) { log.debug("[{}-{}] Added {}:{} ledger entry with batchSize of {} to pendingAcks in" + " broker.service.Consumer for consumerId: {}", - topicName, subscription, entry.getLedgerId(), entry.getEntryId(), batchSize, consumerId); + topicName, subscription, entry.getLedgerId(), entry.getEntryId(), batchSize, + consumerId); } } } @@ -267,7 +277,7 @@ public class Consumer { + " for consumerId: {}; avgMessagesPerEntry is {}", topicName, subscription, ackedCount, totalMessages, consumerId, tmpAvgMessagesPerEntry); } - incrementUnackedMessages(totalMessages); + incrementUnackedMessages(unackedMessages); msgOut.recordMultipleEvents(totalMessages, totalBytes); msgOutCounter.add(totalMessages); bytesOutCounter.add(totalBytes); @@ -381,16 +391,19 @@ public class Consumer { //this method is for individual ack not carry the transaction private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String, Long> properties) { List<Position> positionsAcked = new ArrayList<>(); - for (int i = 0; i < ack.getMessageIdsCount(); i++) { MessageIdData msgId = ack.getMessageIdAt(i); PositionImpl position; + long ackedCount = 0; + long batchSize = getBatchSize(msgId); + Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId()); if (msgId.getAckSetsCount() > 0) { long[] ackSets = new long[msgId.getAckSetsCount()]; for (int j = 0; j < msgId.getAckSetsCount(); j++) { ackSets[j] = msgId.getAckSetAt(j); } position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets); + ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets); if (isTransactionEnabled()) { //sync the batch position bit set point, in order to delete the position in pending acks if (Subscription.isIndividualAckMode(subType)) { @@ -400,7 +413,20 @@ public class Consumer { } } else { position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()); + if (isAcknowledgmentAtBatchIndexLevelEnabled) { + long[] cursorAckSet = getCursorAckSet(position); + if (cursorAckSet != null) { + ackedCount = batchSize - BitSet.valueOf(cursorAckSet).cardinality(); + } else { + ackedCount = batchSize; + } + } else { + ackedCount = batchSize; + } } + + addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); + positionsAcked.add(position); checkCanRemovePendingAcksAndHandle(position, msgId); @@ -430,7 +456,6 @@ public class Consumer { private CompletableFuture<Void> individualAckWithTransaction(CommandAck ack) { // Individual ack List<MutablePair<PositionImpl, Integer>> positionsAcked = new ArrayList<>(); - if (!isTransactionEnabled()) { return FutureUtil.failedFuture( new BrokerServiceException.NotAllowedException("Server don't support transaction ack!")); @@ -476,6 +501,56 @@ public class Consumer { return completableFuture; } + private long getBatchSize(MessageIdData msgId) { + long batchSize = 1; + if (Subscription.isIndividualAckMode(subType)) { + LongPair longPair = pendingAcks.get(msgId.getLedgerId(), msgId.getEntryId()); + // Consumer may ack the msg that not belongs to it. + if (longPair == null) { + Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId()); + longPair = ackOwnerConsumer.getPendingAcks().get(msgId.getLedgerId(), msgId.getEntryId()); + if (longPair != null) { + batchSize = longPair.first; + } + } else { + batchSize = longPair.first; + } + } + return batchSize; + } + + private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets) { + long ackedCount = 0; + if (isAcknowledgmentAtBatchIndexLevelEnabled) { + long[] cursorAckSet = getCursorAckSet(position); + if (cursorAckSet != null) { + BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet); + int lastCardinality = cursorBitSet.cardinality(); + BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(ackSets); + cursorBitSet.and(givenBitSet); + givenBitSet.recycle(); + int currentCardinality = cursorBitSet.cardinality(); + ackedCount = lastCardinality - currentCardinality; + cursorBitSet.recycle(); + } else if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) != null) { + ackedCount = batchSize - BitSet.valueOf(ackSets).cardinality(); + } + } + return ackedCount; + } + + private long getUnAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize) { + long unAckedCount = batchSize; + if (isAcknowledgmentAtBatchIndexLevelEnabled) { + long[] cursorAckSet = getCursorAckSet(position); + if (cursorAckSet != null) { + BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet); + unAckedCount = cursorBitSet.cardinality(); + } + } + return unAckedCount; + } + private void checkAckValidationError(CommandAck ack, PositionImpl position) { if (ack.hasValidationError()) { log.error("[{}] [{}] Received ack for corrupted message at {} - Reason: {}", subscription, @@ -489,6 +564,26 @@ public class Consumer { } } + private Consumer getAckOwnerConsumer(long ledgerId, long entryId) { + Consumer ackOwnerConsumer = this; + if (Subscription.isIndividualAckMode(subType)) { + for (Consumer consumer : subscription.getConsumers()) { + if (consumer != this && consumer.getPendingAcks().containsKey(ledgerId, entryId)) { + ackOwnerConsumer = consumer; + break; + } + } + } + return ackOwnerConsumer; + } + + private long[] getCursorAckSet(PositionImpl position) { + if (!(subscription instanceof PersistentSubscription)) { + return null; + } + return (((PersistentSubscription) subscription).getCursor()).getDeletedBatchIndexesAsLongArray(position); + } + private boolean isTransactionEnabled() { return subscription instanceof PersistentSubscription && ((PersistentTopic) subscription.getTopic()) @@ -711,7 +806,6 @@ public class Consumer { ? ackOwnedConsumer.getPendingAcks().get(position.getLedgerId(), position.getEntryId()) : null; if (ackedPosition != null) { - int totalAckedMsgs = (int) ackedPosition.first; if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) { // Message was already removed by the other consumer return; @@ -721,7 +815,7 @@ public class Consumer { } // unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages => // consumer can start again consuming messages - int unAckedMsgs = addAndGetUnAckedMsgs(ackOwnedConsumer, -totalAckedMsgs); + int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer); if ((((unAckedMsgs <= maxUnackedMessages / 2) && ackOwnedConsumer.blockedConsumerOnUnackedMsgs) && ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs()) || !shouldBlockConsumerOnUnackMsgs()) { @@ -751,7 +845,9 @@ public class Consumer { List<PositionImpl> pendingPositions = new ArrayList<>((int) pendingAcks.size()); MutableInt totalRedeliveryMessages = new MutableInt(0); pendingAcks.forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> { - totalRedeliveryMessages.add((int) batchSize); + int unAckedCount = (int) getUnAckedCountForBatchIndexLevelEnabled(PositionImpl.get(ledgerId, entryId), + batchSize); + totalRedeliveryMessages.add(unAckedCount); pendingPositions.add(new PositionImpl(ledgerId, entryId)); }); @@ -775,9 +871,9 @@ public class Consumer { PositionImpl position = PositionImpl.get(msg.getLedgerId(), msg.getEntryId()); LongPair longPair = pendingAcks.get(position.getLedgerId(), position.getEntryId()); if (longPair != null) { - long batchSize = longPair.first; + int unAckedCount = (int) getUnAckedCountForBatchIndexLevelEnabled(position, longPair.first); pendingAcks.remove(position.getLedgerId(), position.getEntryId()); - totalRedeliveryMessages += batchSize; + totalRedeliveryMessages += unAckedCount; pendingPositions.add(position); } } @@ -811,8 +907,15 @@ public class Consumer { } private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) { - subscription.addUnAckedMessages(ackedMessages); - return UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages); + int unackedMsgs = 0; + if (Subscription.isIndividualAckMode(subType)) { + subscription.addUnAckedMessages(ackedMessages); + unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages); + } + if (unackedMsgs < 0) { + log.error("unackedMsgs is : {}, ackedMessages : {}, consumer : {}", unackedMsgs, ackedMessages, consumer); + } + return unackedMsgs; } private void clearUnAckedMsgs() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index 1bf27ac..c77d2bf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -668,7 +668,7 @@ public class BatchMessageTest extends BrokerTestBase { * * @throws Exception */ - @Test(dataProvider = "containerBuilder", timeOut = 3000) + @Test(dataProvider = "containerBuilder") public void testConcurrentBatchMessageAck(BatcherBuilder builder) throws Exception { int numMsgs = 10; final String topicName = "persistent://prop/ns-abc/testConcurrentAck-" + UUID.randomUUID(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java new file mode 100644 index 0000000..20ba4f1 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import com.google.common.collect.Lists; +import lombok.SneakyThrows; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import static org.testng.Assert.assertEquals; + +@Test(groups = "broker") +public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest { + + @BeforeClass + @Override + protected void setup() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); + super.baseSetup(); + } + + @Test + @SneakyThrows + public void testBatchMessageAck() { + int numMsgs = 40; + final String topicName = "persistent://prop/ns-abc/batchMessageAck-" + UUID.randomUUID(); + final String subscriptionName = "sub-batch-1"; + + ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient + .newConsumer() + .topic(topicName) + .subscriptionName(subscriptionName) + .receiverQueueSize(10) + .subscriptionType(SubscriptionType.Shared) + .enableBatchIndexAcknowledgment(true) + .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) + .subscribe(); + + Producer<byte[]> producer = pulsarClient + .newProducer() + .topic(topicName) + .batchingMaxMessages(20) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .enableBatching(true) + .create(); + + List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList(); + for (int i = 0; i < numMsgs; i++) { + byte[] message = ("batch-message-" + i).getBytes(); + sendFutureList.add(producer.newMessage().value(message).sendAsync()); + } + FutureUtil.waitForAll(sendFutureList).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) topic + .getSubscription(subscriptionName).getDispatcher(); + Message<byte[]> receive1 = consumer.receive(); + Message<byte[]> receive2 = consumer.receive(); + consumer.acknowledge(receive1); + consumer.acknowledge(receive2); + Awaitility.await().untilAsserted(() -> { + assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 18); + }); + Message<byte[]> receive3 = consumer.receive(); + Message<byte[]> receive4 = consumer.receive(); + consumer.acknowledge(receive3); + consumer.acknowledge(receive4); + Awaitility.await().untilAsserted(() -> { + assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16); + }); + Message<byte[]> receive5 = consumer.receive(); + consumer.negativeAcknowledge(receive5); + Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).untilAsserted(() -> { + assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0); + }); + consumer.receive(); + Awaitility.await().untilAsserted(() -> { + assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16); + }); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index d4aae1a..4b947aa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -515,15 +515,15 @@ public class PersistentDispatcherFailoverConsumerTest { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, null); - Consumer consumer1 = createConsumer(0, 2, false, 1); - Consumer consumer2 = createConsumer(0, 2, false, 2); - Consumer consumer3 = createConsumer(0, 2, false, 3); - Consumer consumer4 = createConsumer(1, 2, false, 4); - Consumer consumer5 = createConsumer(1, 1, false, 5); - Consumer consumer6 = createConsumer(1, 2, false, 6); - Consumer consumer7 = createConsumer(2, 1, false, 7); - Consumer consumer8 = createConsumer(2, 1, false, 8); - Consumer consumer9 = createConsumer(2, 1, false, 9); + Consumer consumer1 = createConsumer(topic, 0, 2, false, 1); + Consumer consumer2 = createConsumer(topic, 0, 2, false, 2); + Consumer consumer3 = createConsumer(topic, 0, 2, false, 3); + Consumer consumer4 = createConsumer(topic, 1, 2, false, 4); + Consumer consumer5 = createConsumer(topic, 1, 1, false, 5); + Consumer consumer6 = createConsumer(topic, 1, 2, false, 6); + Consumer consumer7 = createConsumer(topic, 2, 1, false, 7); + Consumer consumer8 = createConsumer(topic, 2, 1, false, 8); + Consumer consumer9 = createConsumer(topic, 2, 1, false, 9); dispatcher.addConsumer(consumer1); dispatcher.addConsumer(consumer2); dispatcher.addConsumer(consumer3); @@ -547,7 +547,7 @@ public class PersistentDispatcherFailoverConsumerTest { Assert.assertEquals(getNextConsumer(dispatcher), consumer7); Assert.assertEquals(getNextConsumer(dispatcher), consumer8); // in between add upper priority consumer with more permits - Consumer consumer10 = createConsumer(0, 2, false, 10); + Consumer consumer10 = createConsumer(topic, 0, 2, false, 10); dispatcher.addConsumer(consumer10); Assert.assertEquals(getNextConsumer(dispatcher), consumer10); Assert.assertEquals(getNextConsumer(dispatcher), consumer10); @@ -559,12 +559,12 @@ public class PersistentDispatcherFailoverConsumerTest { public void testFewBlockedConsumerSamePriority() throws Exception{ PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, null); - Consumer consumer1 = createConsumer(0, 2, false, 1); - Consumer consumer2 = createConsumer(0, 2, false, 2); - Consumer consumer3 = createConsumer(0, 2, false, 3); - Consumer consumer4 = createConsumer(0, 2, false, 4); - Consumer consumer5 = createConsumer(0, 1, true, 5); - Consumer consumer6 = createConsumer(0, 2, true, 6); + Consumer consumer1 = createConsumer(topic, 0, 2, false, 1); + Consumer consumer2 = createConsumer(topic, 0, 2, false, 2); + Consumer consumer3 = createConsumer(topic, 0, 2, false, 3); + Consumer consumer4 = createConsumer(topic, 0, 2, false, 4); + Consumer consumer5 = createConsumer(topic, 0, 1, true, 5); + Consumer consumer6 = createConsumer(topic, 0, 2, true, 6); dispatcher.addConsumer(consumer1); dispatcher.addConsumer(consumer2); dispatcher.addConsumer(consumer3); @@ -586,18 +586,18 @@ public class PersistentDispatcherFailoverConsumerTest { public void testFewBlockedConsumerDifferentPriority() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, null); - Consumer consumer1 = createConsumer(0, 2, false, 1); - Consumer consumer2 = createConsumer(0, 2, false, 2); - Consumer consumer3 = createConsumer(0, 2, false, 3); - Consumer consumer4 = createConsumer(0, 2, false, 4); - Consumer consumer5 = createConsumer(0, 1, true, 5); - Consumer consumer6 = createConsumer(0, 2, true, 6); - Consumer consumer7 = createConsumer(1, 2, false, 7); - Consumer consumer8 = createConsumer(1, 10, true, 8); - Consumer consumer9 = createConsumer(1, 2, false, 9); - Consumer consumer10 = createConsumer(2, 2, false, 10); - Consumer consumer11 = createConsumer(2, 10, true, 11); - Consumer consumer12 = createConsumer(2, 2, false, 12); + Consumer consumer1 = createConsumer(topic, 0, 2, false, 1); + Consumer consumer2 = createConsumer(topic, 0, 2, false, 2); + Consumer consumer3 = createConsumer(topic, 0, 2, false, 3); + Consumer consumer4 = createConsumer(topic, 0, 2, false, 4); + Consumer consumer5 = createConsumer(topic, 0, 1, true, 5); + Consumer consumer6 = createConsumer(topic, 0, 2, true, 6); + Consumer consumer7 = createConsumer(topic, 1, 2, false, 7); + Consumer consumer8 = createConsumer(topic, 1, 10, true, 8); + Consumer consumer9 = createConsumer(topic, 1, 2, false, 9); + Consumer consumer10 = createConsumer(topic, 2, 2, false, 10); + Consumer consumer11 = createConsumer(topic, 2, 10, true, 11); + Consumer consumer12 = createConsumer(topic, 2, 2, false, 12); dispatcher.addConsumer(consumer1); dispatcher.addConsumer(consumer2); dispatcher.addConsumer(consumer3); @@ -625,8 +625,8 @@ public class PersistentDispatcherFailoverConsumerTest { Assert.assertEquals(getNextConsumer(dispatcher), consumer10); Assert.assertEquals(getNextConsumer(dispatcher), consumer12); // add consumer with lower priority again - Consumer consumer13 = createConsumer(0, 2, false, 13); - Consumer consumer14 = createConsumer(0, 2, true, 14); + Consumer consumer13 = createConsumer(topic, 0, 2, false, 13); + Consumer consumer14 = createConsumer(topic, 0, 2, true, 14); dispatcher.addConsumer(consumer13); dispatcher.addConsumer(consumer14); Assert.assertEquals(getNextConsumer(dispatcher), consumer13); @@ -640,13 +640,13 @@ public class PersistentDispatcherFailoverConsumerTest { public void testFewBlockedConsumerDifferentPriority2() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, null); - Consumer consumer1 = createConsumer(0, 2, true, 1); - Consumer consumer2 = createConsumer(0, 2, true, 2); - Consumer consumer3 = createConsumer(0, 2, true, 3); - Consumer consumer4 = createConsumer(1, 2, false, 4); - Consumer consumer5 = createConsumer(1, 1, false, 5); - Consumer consumer6 = createConsumer(2, 1, false, 6); - Consumer consumer7 = createConsumer(2, 2, true, 7); + Consumer consumer1 = createConsumer(topic, 0, 2, true, 1); + Consumer consumer2 = createConsumer(topic, 0, 2, true, 2); + Consumer consumer3 = createConsumer(topic, 0, 2, true, 3); + Consumer consumer4 = createConsumer(topic, 1, 2, false, 4); + Consumer consumer5 = createConsumer(topic, 1, 1, false, 5); + Consumer consumer6 = createConsumer(topic, 2, 1, false, 6); + Consumer consumer7 = createConsumer(topic, 2, 2, true, 7); dispatcher.addConsumer(consumer1); dispatcher.addConsumer(consumer2); dispatcher.addConsumer(consumer3); @@ -676,9 +676,10 @@ public class PersistentDispatcherFailoverConsumerTest { return null; } - private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception { + private Consumer createConsumer(PersistentTopic topic, int priority, int permit, boolean blocked, int id) throws Exception { + PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); Consumer consumer = - new Consumer(null, SubType.Shared, "test-topic", id, priority, ""+id, 5000, + new Consumer(sub, SubType.Shared, "test-topic", id, priority, ""+id, 5000, serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); try { consumer.flowPermits(permit);
