This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit cf526bf88a49c015f0656b3bc59d4e7ca5440c41 Author: lipenghui <[email protected]> AuthorDate: Wed Aug 4 08:19:20 2021 +0800 Fix data lost when using earliest position to subscribe to a topic (#11547) When subscribing to a topic with earliest position, the ManagedLedger always using the last position to init the cursor. If the no cursor update happens and the broker restarts or topic been unloaded or the topic ownership changed, will lead to the data lost, the unacked messages will not redeliver to the consumer again. The root cause is if we are using the last position to init the cursor, the cursor will update the mark delete position as the last position first to the Zookeeper, if the cursor can't a chance to update the mark delete position again before been closed, when recoving the cursor again, will using the mark delete posiion that stored in the Zookeeper, so the issue happens. The fix is to add check for the initial position of the cursor, if we are using the Earliest as the initial position, use the first position to init the cursor. The new added test can cover the changes, and without this change, the test would failed. (cherry picked from commit 035a6bab7af8ed17f811c16b518dc02eea2435a1) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 +- .../pulsar/client/api/ConsumerRedeliveryTest.java | 54 ++++++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index fdf8a95..dcddc43 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -903,7 +903,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, config, this, cursorName); CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>(); uninitializedCursors.put(cursorName, cursorFuture); - cursor.initialize(getLastPosition(), properties, new VoidCallback() { + PositionImpl position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition(); + cursor.initialize(position, properties, new VoidCallback() { @Override public void operationComplete() { log.info("[{}] Opened new cursor: {}", name, cursor); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java index ed9f548..9b15469 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java @@ -30,6 +30,7 @@ import lombok.Cleanup; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterClass; @@ -40,6 +41,7 @@ import org.testng.annotations.Test; import com.google.common.collect.Sets; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertEquals; @@ -247,4 +249,56 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase { assertEquals(consumer2.getTotalIncomingMessages(), queueSize); log.info("-- Exiting {} test --", methodName); } + + @Test(timeOut = 30000) + public void testMessageRedeliveryAfterUnloadedWithEarliestPosition() throws Exception { + + final String subName = "my-subscriber-name"; + final String topicName = "testMessageRedeliveryAfterUnloadedWithEarliestPosition" + UUID.randomUUID(); + final int messages = 100; + + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .enableBatching(false) + .create(); + + List<CompletableFuture<MessageId>> sendResults = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + sendResults.add(producer.sendAsync("Hello - " + i)); + } + producer.flush(); + + FutureUtil.waitForAll(sendResults).get(); + + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName(subName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + List<Message<String>> received = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + received.add(consumer.receive()); + } + + assertEquals(received.size(), messages); + assertNull(consumer.receive(1, TimeUnit.SECONDS)); + + admin.topics().unload(topicName); + + // The consumer does not ack any messages, so after unloading the topic, + // the consumer should get the unacked messages again + + received.clear(); + for (int i = 0; i < messages; i++) { + received.add(consumer.receive()); + } + + assertEquals(received.size(), messages); + assertNull(consumer.receive(1, TimeUnit.SECONDS)); + + consumer.close(); + producer.close(); + } }
