This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 08f6c337a7f5690ea53217cf9cf05ebc0be01e21 Author: fengyubiao <[email protected]> AuthorDate: Mon Dec 15 21:16:37 2025 +0800 [fix][broker]Fix incorrect backlog if use multiple acknowledge types on the same subscription (#25047) Co-authored-by: Jiwe Guo <[email protected]> (cherry picked from commit 81aff30c4b461b2c630f413c407ba014e7a5d571) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 128 ++++--- .../mledger/impl/NonDurableCursorImpl.java | 13 +- .../client/api/HybridTypesAcknowledgeTest.java | 422 +++++++++++++++++++++ 3 files changed, 512 insertions(+), 51 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index ccda99cc497..8b2c525af0c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -105,6 +105,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo.Builder; import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty; import org.apache.bookkeeper.mledger.util.ManagedLedgerUtils; import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; import org.apache.pulsar.common.util.DateFormatter; @@ -208,7 +209,7 @@ public class ManagedCursorImpl implements ManagedCursor { @Getter @VisibleForTesting @Nullable protected final ConcurrentSkipListMap<Position, BitSet> batchDeletedIndexes; - private final ReadWriteLock lock = new ReentrantReadWriteLock(); + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); private RateLimiter markDeleteLimiter; // The cursor is considered "dirty" when there are mark-delete updates that are only applied in memory, @@ -238,6 +239,7 @@ public class ManagedCursorImpl implements ManagedCursor { final MarkDeleteCallback callback; final Object ctx; final Map<String, Long> properties; + final Runnable alignAcknowledgeStatusAfterPersisted; // If the callbackGroup is set, it means this mark-delete request was done on behalf of a group of request (just // persist the last one in the chain). In this case we need to trigger the callbacks for every request in the @@ -246,10 +248,26 @@ public class ManagedCursorImpl implements ManagedCursor { public MarkDeleteEntry(Position newPosition, Map<String, Long> properties, MarkDeleteCallback callback, Object ctx) { + this(newPosition, properties, callback, ctx, null); + } + + public MarkDeleteEntry(Position newPosition, Map<String, Long> properties, + MarkDeleteCallback callback, Object ctx, Runnable alignAcknowledgeStatusAfterPersisted) { + if (alignAcknowledgeStatusAfterPersisted == null) { + alignAcknowledgeStatusAfterPersisted = () -> { + if (batchDeletedIndexes != null) { + batchDeletedIndexes.subMap(PositionFactory.EARLIEST, + false, PositionFactory.create(newPosition.getLedgerId(), + newPosition.getEntryId()), true).clear(); + } + persistentMarkDeletePosition = newPosition; + }; + } this.newPosition = newPosition; this.properties = properties; this.callback = callback; this.ctx = ctx; + this.alignAcknowledgeStatusAfterPersisted = alignAcknowledgeStatusAfterPersisted; } public void triggerComplete() { @@ -267,6 +285,10 @@ public class ManagedCursorImpl implements ManagedCursor { } } + public void alignAcknowledgeStatus() { + this.alignAcknowledgeStatusAfterPersisted.run(); + } + public void triggerFailed(ManagedLedgerException exception) { if (callbackGroup != null) { for (MarkDeleteEntry e : callbackGroup) { @@ -1482,6 +1504,56 @@ public class ManagedCursorImpl implements ManagedCursor { final Position newMarkDeletePosition = ledger.getPreviousPosition(newReadPosition); + Runnable alignAcknowledgeStatusAfterPersisted = () -> { + // Correct the variable "messagesConsumedCounter". + // BTW, no need to change "messagesConsumedCounter" if new "markDeletePosition" is the same as the + // old one. + int compareRes = ledger.comparePositions(markDeletePosition, newMarkDeletePosition); + if (compareRes > 0) { + MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries( + Range.openClosed(newMarkDeletePosition, markDeletePosition))); + } else if (compareRes < 0) { + long entries = getNumberOfEntries(Range.openClosed(markDeletePosition, newMarkDeletePosition)); + MSG_CONSUMED_COUNTER_UPDATER.addAndGet(ManagedCursorImpl.this, entries); + } + individualDeletedMessages.removeAtMost(newMarkDeletePosition.getLedgerId(), + newMarkDeletePosition.getEntryId()); + + // Entries already acknowledged, which is larger than the new mark deleted position. + MutableLong ackedEntriesAfterMdPosition = new MutableLong(); + individualDeletedMessages.forEach((r) -> { + for (long i = r.lowerEndpoint().getEntryId() + 1; i <= r.upperEndpoint().getEntryId(); i++) { + ackedEntriesAfterMdPosition.incrementAndGet(); + } + return true; + }); + MSG_CONSUMED_COUNTER_UPDATER.addAndGet(ManagedCursorImpl.this, + -ackedEntriesAfterMdPosition.get().longValue()); + markDeletePosition = newMarkDeletePosition; + lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor() + ? getProperties() : Collections.emptyMap(), null, null); + individualDeletedMessages.clear(); + if (batchDeletedIndexes != null) { + batchDeletedIndexes.clear(); + AckSetStateUtil.maybeGetAckSetState(newReadPosition).ifPresent(ackSetState -> { + long[] resetWords = ackSetState.getAckSet(); + if (resetWords != null) { + batchDeletedIndexes.put(newReadPosition, BitSet.valueOf(resetWords)); + } + }); + } + + Position oldReadPosition = readPosition; + if (oldReadPosition.compareTo(newReadPosition) >= 0) { + log.info("[{}] reset readPosition to {} before current read readPosition {} on cursor {}", + ledger.getName(), newReadPosition, oldReadPosition, name); + } else { + log.info("[{}] reset readPosition to {} skipping from current read readPosition {} on " + + "cursor {}", ledger.getName(), newReadPosition, oldReadPosition, name); + } + readPosition = newReadPosition; + }; + VoidCallback finalCallback = new VoidCallback() { @Override public void operationComplete() { @@ -1489,40 +1561,6 @@ public class ManagedCursorImpl implements ManagedCursor { // modify mark delete and read position since we are able to persist new position for cursor lock.writeLock().lock(); try { - // Correct the variable "messagesConsumedCounter". - // BTW, no need to change "messagesConsumedCounter" if new "markDeletePosition" is the same as the - // old one. - int compareRes = ledger.comparePositions(markDeletePosition, newMarkDeletePosition); - if (compareRes > 0) { - MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries( - Range.openClosed(newMarkDeletePosition, markDeletePosition))); - } else if (compareRes < 0) { - MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), getNumberOfEntries( - Range.openClosed(markDeletePosition, newMarkDeletePosition))); - } - markDeletePosition = newMarkDeletePosition; - lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor() - ? getProperties() : Collections.emptyMap(), null, null); - individualDeletedMessages.clear(); - if (batchDeletedIndexes != null) { - batchDeletedIndexes.clear(); - AckSetStateUtil.maybeGetAckSetState(newReadPosition).ifPresent(ackSetState -> { - long[] resetWords = ackSetState.getAckSet(); - if (resetWords != null) { - batchDeletedIndexes.put(newReadPosition, BitSet.valueOf(resetWords)); - } - }); - } - - Position oldReadPosition = readPosition; - if (oldReadPosition.compareTo(newReadPosition) >= 0) { - log.info("[{}] reset readPosition to {} before current read readPosition {} on cursor {}", - ledger.getName(), newReadPosition, oldReadPosition, name); - } else { - log.info("[{}] reset readPosition to {} skipping from current read readPosition {} on " - + "cursor {}", ledger.getName(), newReadPosition, oldReadPosition, name); - } - readPosition = newReadPosition; ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition); } finally { lock.writeLock().unlock(); @@ -1566,7 +1604,7 @@ public class ManagedCursorImpl implements ManagedCursor { public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { finalCallback.operationFailed(exception); } - }, null); + }, null, alignAcknowledgeStatusAfterPersisted); } @Override @@ -2181,7 +2219,7 @@ public class ManagedCursorImpl implements ManagedCursor { callback.markDeleteComplete(ctx); return; } - internalAsyncMarkDelete(newPosition, properties, callback, ctx); + internalAsyncMarkDelete(newPosition, properties, callback, ctx, null); } private Position ackBatchPosition(Position position) { @@ -2210,10 +2248,11 @@ public class ManagedCursorImpl implements ManagedCursor { } protected void internalAsyncMarkDelete(final Position newPosition, Map<String, Long> properties, - final MarkDeleteCallback callback, final Object ctx) { + final MarkDeleteCallback callback, final Object ctx, Runnable alignAcknowledgeStatusAfterPersisted) { ledger.mbean.addMarkDeleteOp(); - MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx); + MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx, + alignAcknowledgeStatusAfterPersisted); // We cannot write to the ledger during the switch, need to wait until the new metadata ledger is available synchronized (pendingMarkDeleteOps) { @@ -2312,14 +2351,7 @@ public class ManagedCursorImpl implements ManagedCursor { // point. lock.writeLock().lock(); try { - individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(), - mdEntry.newPosition.getEntryId()); - if (batchDeletedIndexes != null) { - batchDeletedIndexes.subMap(PositionFactory.EARLIEST, - false, PositionFactory.create(mdEntry.newPosition.getLedgerId(), - mdEntry.newPosition.getEntryId()), true).clear(); - } - persistentMarkDeletePosition = mdEntry.newPosition; + mdEntry.alignAcknowledgeStatus(); } finally { lock.writeLock().unlock(); } @@ -2576,7 +2608,7 @@ public class ManagedCursorImpl implements ManagedCursor { callback.deleteFailed(exception, ctx); } - }, ctx); + }, ctx, null); } catch (Exception e) { log.warn("[{}] [{}] Error doing asyncDelete [{}]", ledger.getName(), name, e.getMessage(), e); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java index 96906ddcbb0..246ca81c750 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java @@ -100,11 +100,18 @@ public class NonDurableCursorImpl extends ManagedCursorImpl { @Override protected void internalAsyncMarkDelete(final Position newPosition, Map<String, Long> properties, - final MarkDeleteCallback callback, final Object ctx) { + final MarkDeleteCallback callback, final Object ctx, Runnable alignAcknowledgeStatusAfterPersisted) { // Bypass persistence of mark-delete position and individually deleted messages info - MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx); - lastMarkDeleteEntry = mdEntry; + MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx, + alignAcknowledgeStatusAfterPersisted); + lock.writeLock().lock(); + try { + lastMarkDeleteEntry = mdEntry; + mdEntry.alignAcknowledgeStatus(); + } finally { + lock.writeLock().unlock(); + } // it is important to advance cursor so the retention can kick in as expected. ledger.onCursorMarkDeletePositionUpdated(NonDurableCursorImpl.this, mdEntry.newPosition); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java new file mode 100644 index 00000000000..e10a0931f0c --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import com.google.common.collect.Sets; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterDataImpl; +import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.tests.TestRetrySupport; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test(groups = "broker-api") +@Slf4j +public class HybridTypesAcknowledgeTest extends TestRetrySupport { + + protected static final String DEFAULT_NS = "public/default"; + protected String clusterName = "c1"; + protected LocalBookkeeperEnsemble bkEnsemble; + protected ServiceConfiguration conf = new ServiceConfiguration(); + protected PulsarService pulsar; + protected URL url; + protected PulsarAdmin admin; + protected PulsarClientImpl client; + + @Override + @BeforeClass(alwaysRun = true) + protected void setup() throws Exception { + incrementSetupNumber(); + bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble.start(); + // Start broker. + setupBrokers(); + // Create default NS. + admin.clusters().createCluster(clusterName, new ClusterDataImpl()); + admin.tenants().createTenant(NamespaceName.get(DEFAULT_NS).getTenant(), + new TenantInfoImpl(Collections.emptySet(), Sets.newHashSet(clusterName))); + admin.namespaces().createNamespace(DEFAULT_NS); + } + + @Override + @AfterClass(alwaysRun = true) + protected void cleanup() throws Exception { + markCurrentSetupNumberCleaned(); + cleanupBrokers(); + if (bkEnsemble != null) { + bkEnsemble.stop(); + bkEnsemble = null; + } + } + + protected void cleanupBrokers() throws Exception { + // Cleanup broker2. + if (client != null) { + client.close(); + client = null; + } + if (admin != null) { + admin.close(); + admin = null; + } + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + // Reset configs. + conf = new ServiceConfiguration(); + } + + protected void setupBrokers() throws Exception { + doInitConf(); + // Start broker. + pulsar = new PulsarService(conf); + pulsar.start(); + url = new URL(pulsar.getWebServiceAddress()); + admin = PulsarAdmin.builder().serviceHttpUrl(url.toString()).build(); + client = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + } + + protected void doInitConf() { + conf.setClusterName(clusterName); + conf.setAdvertisedAddress("localhost"); + conf.setBrokerServicePort(Optional.of(0)); + conf.setWebServicePort(Optional.of(0)); + conf.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); + conf.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort() + "/foo"); + conf.setBrokerDeleteInactiveTopicsEnabled(false); + conf.setBrokerShutdownTimeoutMs(0L); + conf.setLoadBalancerSheddingEnabled(false); + } + + @Test + public void testBacklogWithHybridAcknowledgement() throws Exception { + String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + String mlName = TopicName.get(topic).getPersistenceNamingEncoding(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(500); + config.setMinimumRolloverTime(1, TimeUnit.SECONDS); + ManagedLedgerFactory factory = pulsar.getDefaultManagedLedgerFactory(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, config); + Producer<String> producer = client.newProducer(Schema.STRING) + .enableBatching(false) + .topic(topic) + .create(); + admin.topics().createSubscription(topic, "s1", MessageId.earliest); + admin.topics().createSubscription(topic, "s2", MessageId.earliest); + admin.topics().createSubscription(topic, "s3", MessageId.earliest); + + Consumer<String> c1 = client.newConsumer(Schema.STRING).topic(topic).subscriptionName("s1").subscribe(); + Consumer<String> c2 = client.newConsumer(Schema.STRING).topic(topic).subscriptionName("s2").subscribe(); + Consumer<String> c3 = client.newConsumer(Schema.STRING).topic(topic).subscriptionName("s3").subscribe(); + Consumer<String> c4 = client.newConsumer(Schema.STRING).topic(topic).subscriptionName("s4").subscribe(); + + Random random = new Random(); + List<Message<String>> cachedMessagesInMem2 = new ArrayList<>(); + List<Message<String>> cachedMessagesInMem3 = new ArrayList<>(); + List<Message<String>> cachedMessagesInMem4 = new ArrayList<>(); + List<String> randomlyAcked = new ArrayList<>(); + + for (int i = 0; i < 400; i++) { + producer.send("message-" + i); + } + + for (int i = 0; i < 400; i++) { + Message<String> m1 = c1.receive(2, TimeUnit.SECONDS); + c1.acknowledge(m1); + if (random.nextBoolean()) { + Message<String> m2 = c2.receive(2, TimeUnit.SECONDS); + c2.acknowledge(m2); + Message<String> m3 = c3.receive(2, TimeUnit.SECONDS); + c3.acknowledge(m3); + Message<String> m4 = c4.receive(2, TimeUnit.SECONDS); + c4.acknowledge(m4); + randomlyAcked.add(m4.getValue()); + } else { + Message<String> m2 = c2.receive(2, TimeUnit.SECONDS); + cachedMessagesInMem2.add(m2); + Message<String> m3 = c3.receive(2, TimeUnit.SECONDS); + cachedMessagesInMem3.add(m3); + Message<String> m4 = c4.receive(2, TimeUnit.SECONDS); + cachedMessagesInMem4.add(m4); + } + } + + for (int i = 400; i < 900; i++) { + producer.send("message-" + i); + } + + for (int i = 400; i < 600; i++) { + Message<String> m1 = c1.receive(2, TimeUnit.SECONDS); + c1.acknowledge(m1); + if (random.nextBoolean()) { + Message<String> m2 = c2.receive(2, TimeUnit.SECONDS); + c2.acknowledge(m2); + Message<String> m3 = c3.receive(2, TimeUnit.SECONDS); + c3.acknowledge(m3); + Message<String> m4 = c4.receive(2, TimeUnit.SECONDS); + c4.acknowledge(m4); + randomlyAcked.add(m4.getValue()); + } else { + Message<String> m2 = c2.receive(2, TimeUnit.SECONDS); + cachedMessagesInMem2.add(m2); + Message<String> m3 = c3.receive(2, TimeUnit.SECONDS); + cachedMessagesInMem3.add(m3); + Message<String> m4 = c4.receive(2, TimeUnit.SECONDS); + cachedMessagesInMem4.add(m4); + } + } + + log.info("s3 cached unacked messages: " + cachedMessagesInMem3.size() + ", acked list: " + + randomlyAcked.size() + ", acked set: " + new HashSet<>(randomlyAcked).size()); + assertEquals(randomlyAcked.size(), new HashSet<>(randomlyAcked).size()); + + for (int i = 600; i < 900; i++) { + Message<String> m1 = c1.receive(2, TimeUnit.SECONDS); + c1.acknowledge(m1); + } + + Awaitility.await().untilAsserted(() -> { + Map<String, ? extends SubscriptionStats> statsPrecise = + admin.topics().getStats(topic, true).getSubscriptions(); + assertEquals(statsPrecise.get("s2").getMsgBacklog(), 300 + cachedMessagesInMem2.size()); + assertEquals(statsPrecise.get("s3").getMsgBacklog(), 300 + cachedMessagesInMem3.size()); + assertEquals(statsPrecise.get("s4").getMsgBacklog(), 300 + cachedMessagesInMem4.size()); + }); + + // c2: ack all messages that un-acked + // c3: seek to skip all messages that un0acked. + // c4: cumulative to skip all messages that un-acked. + // c2. + for (Message<String> m2 : cachedMessagesInMem2) { + c2.acknowledge(m2); + } + // c3. + MessageIdAdv messageIdAdv = + (MessageIdAdv) cachedMessagesInMem3.get(cachedMessagesInMem3.size() - 1).getMessageId(); + Position pos = ml.getNextValidPosition(PositionFactory.create(messageIdAdv.getLedgerId(), + messageIdAdv.getEntryId())); + c3.seek(new MessageIdImpl(pos.getLedgerId(), pos.getEntryId(), -1)); + c3.acknowledgeCumulative(cachedMessagesInMem3.get(cachedMessagesInMem3.size() - 1)); + // c4. + c4.acknowledgeCumulative(cachedMessagesInMem3.get(cachedMessagesInMem3.size() - 1)); + + // Verify: the backlog is the same as precise one. + verifyBacklogSameAsPrecise(topic); + + // cleanup. + c1.close(); + c2.close(); + c3.close(); + c4.close(); + producer.close(); + admin.topics().delete(topic); + } + + @DataProvider + public Object[][] resetDirection() { + return new Object[][]{ + {"forward"}, + {"stationary"}, + {"backward"} + }; + } + + @Test(dataProvider = "resetDirection") + public void testResetCursorAndClearFollowingIndividualAcks(String resetDirection) throws Exception { + String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + String mlName = TopicName.get(topic).getPersistenceNamingEncoding(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(500); + config.setMinimumRolloverTime(1, TimeUnit.SECONDS); + ManagedLedgerFactory factory = pulsar.getDefaultManagedLedgerFactory(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, config); + Producer<String> producer = client.newProducer(Schema.STRING) + .enableBatching(false) + .topic(topic) + .create(); + admin.topics().createSubscription(topic, "s1", MessageId.earliest); + admin.topics().createSubscription(topic, "s2", MessageId.earliest); + admin.topics().createSubscription(topic, "s3", MessageId.earliest); + admin.topics().createSubscription(topic, "s4", MessageId.earliest); + Consumer<String> c1 = client.newConsumer(Schema.STRING).topic(topic).subscriptionName("s1").subscribe(); + Consumer<String> c2 = client.newConsumer(Schema.STRING).topic(topic).subscriptionName("s2").subscribe(); + Consumer<String> c3 = client.newConsumer(Schema.STRING).topic(topic).subscriptionName("s3").subscribe(); + Consumer<String> c4 = client.newConsumer(Schema.STRING).topic(topic).subscriptionName("s4").subscribe(); + + Random random = new Random(); + List<Message<String>> cachedMessagesInMem2 = new ArrayList<>(); + List<Message<String>> cachedMessagesInMem3 = new ArrayList<>(); + List<Message<String>> cachedMessagesInMem4 = new ArrayList<>(); + List<String> randomlyAcked = new ArrayList<>(); + + for (int i = 0; i < 1000; i++) { + producer.send("message-" + i); + } + + // Continuously acknowledges 600 messages. + MessageId messageId200 = null; + for (int i = 0; i < 600; i++) { + Message<String> m1 = c1.receive(2, TimeUnit.SECONDS); + c1.acknowledge(m1); + Message<String> m2 = c2.receive(2, TimeUnit.SECONDS); + c2.acknowledge(m2); + Message<String> m3 = c3.receive(2, TimeUnit.SECONDS); + c3.acknowledge(m3); + Message<String> m4 = c4.receive(2, TimeUnit.SECONDS); + c4.acknowledge(m4); + randomlyAcked.add(m4.getValue()); + if (i == 200) { + messageId200 = m3.getMessageId(); + } + } + + // Makes acknowledge holes between 601~900. + for (int i = 600; i < 900; i++) { + Message<String> m1 = c1.receive(2, TimeUnit.SECONDS); + c1.acknowledge(m1); + // The condition "Method…" + if (random.nextBoolean() && i % 100 != 0) { + Message<String> m2 = c2.receive(2, TimeUnit.SECONDS); + c2.acknowledge(m2); + Message<String> m3 = c3.receive(2, TimeUnit.SECONDS); + c3.acknowledge(m3); + Message<String> m4 = c4.receive(2, TimeUnit.SECONDS); + c4.acknowledge(m4); + randomlyAcked.add(m4.getValue()); + } else { + Message<String> m2 = c2.receive(2, TimeUnit.SECONDS); + cachedMessagesInMem2.add(m2); + Message<String> m3 = c3.receive(2, TimeUnit.SECONDS); + cachedMessagesInMem3.add(m3); + Message<String> m4 = c4.receive(2, TimeUnit.SECONDS); + cachedMessagesInMem4.add(m4); + } + } + + Awaitility.await().untilAsserted(() -> { + Map<String, ? extends SubscriptionStats> statsPrecise = + admin.topics().getStats(topic, true).getSubscriptions(); + assertEquals(statsPrecise.get("s2").getMsgBacklog(), 100 + cachedMessagesInMem2.size()); + assertEquals(statsPrecise.get("s3").getMsgBacklog(), 100 + cachedMessagesInMem3.size()); + assertEquals(statsPrecise.get("s4").getMsgBacklog(), 100 + cachedMessagesInMem4.size()); + }); + + log.info("subscription cached unacked messages: " + cachedMessagesInMem3.size() + ", acked list: " + + randomlyAcked.size() + ", acked set: " + new HashSet<>(randomlyAcked).size()); + assertEquals(randomlyAcked.size(), new HashSet<>(randomlyAcked).size()); + + + // c1: continuously ack all messages. + // c2: ack all messages that un-acked + // c3: seek to skip all messages that un0acked. + // c4: cumulative to skip all messages that un-acked. + + // c1. + for (int i = 900; i < 1000; i++) { + Message<String> m1 = c1.receive(2, TimeUnit.SECONDS); + c1.acknowledge(m1); + } + + // c2. + for (Message<String> m2 : cachedMessagesInMem2) { + c2.acknowledge(m2); + } + + // c3. + ManagedCursorImpl cursor3 = (ManagedCursorImpl) ml.getCursors().get("s3"); + assertTrue(cachedMessagesInMem3.size() >= 2); + MessageId targetMessageId; + if ("forward".equals(resetDirection)) { + targetMessageId = cachedMessagesInMem3.get(cachedMessagesInMem3.size() >> 1).getMessageId(); + } else if ("stationary".equals(resetDirection)) { + Position mdPosition = ml.getCursors().get("s3").getMarkDeletedPosition(); + targetMessageId = new MessageIdImpl(mdPosition.getLedgerId(), mdPosition.getEntryId(), -1); + } else { + targetMessageId = messageId200; + } + log.info("cursor3 before seek. md-pos: {}, individualAcks: {}, targetPosition: {}, backlog: {}", + cursor3.getMarkDeletedPosition(), cursor3.getIndividuallyDeletedMessages(), targetMessageId, + cursor3.getNumberOfEntriesInBacklog(false)); + c3.seek(targetMessageId); + log.info("cursor3 after seek. md-pos: {}, individualAcks: {}, targetPosition: {}, backlog: {}", + cursor3.getMarkDeletedPosition(), cursor3.getIndividuallyDeletedMessages(), targetMessageId, + cursor3.getNumberOfEntriesInBacklog(false)); + + // c4. + c4.acknowledgeCumulative(targetMessageId); + + // Verify: the backlog is the same as precise one. + verifyBacklogSameAsPrecise(topic); + + // cleanup. + c1.close(); + c2.close(); + c3.close(); + c4.close(); + producer.close(); + admin.topics().delete(topic); + } + + private void verifyBacklogSameAsPrecise(String topic) { + Awaitility.await().pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> { + Map<String, ? extends SubscriptionStats> stats = admin.topics().getStats(topic).getSubscriptions(); + Map<String, ? extends SubscriptionStats> statsPrecise = + admin.topics().getStats(topic, true).getSubscriptions(); + for (Map.Entry<String, ? extends SubscriptionStats> item : stats.entrySet()) { + long preciseBacklog = statsPrecise.get(item.getKey()).getMsgBacklog(); + long backlog = item.getValue().getMsgBacklog(); + log.info("subscription: " + item.getKey() + ", preciseBacklog: " + + preciseBacklog + ", backlog: " + backlog); + assertEquals(backlog, preciseBacklog); + } + }); + } +}
