This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fdc5eab8eb285922cd86fab3b5b998e815b22d72
Author: lipenghui <[email protected]>
AuthorDate: Wed Aug 4 08:19:20 2021 +0800

    Fix data lost when using earliest position to subscribe to a topic (#11547)
    
    When subscribing to a topic with earliest position, the ManagedLedger 
always using
    the last position to init the cursor. If the no cursor update happens and 
the broker restarts
    or topic been unloaded or the topic ownership changed, will lead to the 
data lost, the unacked messages
    will not redeliver to the consumer again.
    
    The root cause is if we are using the last position to init the cursor, the 
cursor will update the
    mark delete position as the last position first to the Zookeeper, if the 
cursor can't a chance to
    update the mark delete position again before been closed, when recoving the 
cursor again, will using
    the mark delete posiion that stored in the Zookeeper, so the issue happens.
    
    The fix is to add check for the initial position of the cursor, if we are 
using the Earliest as the initial position,
    use the first position to init the cursor.
    
    The new added test can cover the changes, and without this change, the test 
would failed.
    
    (cherry picked from commit 035a6bab7af8ed17f811c16b518dc02eea2435a1)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  3 +-
 .../pulsar/client/api/ConsumerRedeliveryTest.java  | 54 ++++++++++++++++++++++
 2 files changed, 56 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0e4a82c..5a5fe49 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -791,7 +791,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, 
config, this, cursorName);
         CompletableFuture<ManagedCursor> cursorFuture = new 
CompletableFuture<>();
         uninitializedCursors.put(cursorName, cursorFuture);
-        cursor.initialize(getLastPosition(), properties, new VoidCallback() {
+        PositionImpl position = InitialPosition.Earliest == initialPosition ? 
getFirstPosition() : getLastPosition();
+        cursor.initialize(position, properties, new VoidCallback() {
             @Override
             public void operationComplete() {
                 log.info("[{}] Opened new cursor: {}", name, cursor);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index e828598..f594d4e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.api;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -29,6 +30,7 @@ import lombok.Cleanup;
 
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -36,6 +38,7 @@ import org.testng.annotations.Test;
 import com.google.common.collect.Sets;
 
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.assertEquals;
 
@@ -179,4 +182,55 @@ public class ConsumerRedeliveryTest extends 
ProducerConsumerBase {
         consumer.close();
     }
 
+    @Test(timeOut = 30000)
+    public void testMessageRedeliveryAfterUnloadedWithEarliestPosition() 
throws Exception {
+
+        final String subName = "my-subscriber-name";
+        final String topicName = 
"testMessageRedeliveryAfterUnloadedWithEarliestPosition" + UUID.randomUUID();
+        final int messages = 100;
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        List<CompletableFuture<MessageId>> sendResults = new 
ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            sendResults.add(producer.sendAsync("Hello - " + i));
+        }
+        producer.flush();
+
+        FutureUtil.waitForAll(sendResults).get();
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName(subName)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        List<Message<String>> received = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            received.add(consumer.receive());
+        }
+
+        assertEquals(received.size(), messages);
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+        admin.topics().unload(topicName);
+
+        // The consumer does not ack any messages, so after unloading the 
topic,
+        // the consumer should get the unacked messages again
+
+        received.clear();
+        for (int i = 0; i < messages; i++) {
+            received.add(consumer.receive());
+        }
+
+        assertEquals(received.size(), messages);
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+        consumer.close();
+        producer.close();
+    }
 }

Reply via email to