This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new dd28bb41f44 [fix] [ml] Reader can set read-pos to a deleted ledger 
(#21248)
dd28bb41f44 is described below

commit dd28bb41f44de7d88aec39c79cc8014d64ad4476
Author: fengyubiao <[email protected]>
AuthorDate: Sun Oct 8 11:25:02 2023 +0800

    [fix] [ml] Reader can set read-pos to a deleted ledger (#21248)
    
    ### Motivation
    
    After trimming ledgers, the variable `lastConfirmedEntry` of the managed 
ledger might rely on a deleted ledger(the latest ledger which contains data).
    
    There is a bug that makes pulsar allow users to set the start read position 
to an unexisting ledger or a deleted ledger when creating a subscription. This 
makes the `backlog` and `markDeletedPosition` wrong.
    
    ### Modifications
    
    Fix the bug.
    
    (cherry picked from commit 4ee5cd78147890ce3c23092edbfc11370972728e)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   3 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  21 +-
 .../mledger/impl/NonDurableCursorImpl.java         |   2 +-
 .../client/api/NonDurableSubscriptionTest.java     | 216 +++++++++++++++++++++
 4 files changed, 234 insertions(+), 8 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 9063602dd70..3a4d371019c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1107,6 +1107,9 @@ public class ManagedCursorImpl implements ManagedCursor {
                     messagesConsumedCounter, markDeletePosition, readPosition);
         }
         if (isPrecise) {
+            if (markDeletePosition.compareTo(ledger.getLastPosition()) >= 0) {
+                return 0;
+            }
             return getNumberOfEntries(Range.openClosed(markDeletePosition, 
ledger.getLastPosition()));
         }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0f65a1a089c..d51b48bdda5 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3630,23 +3630,30 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
      * @return true if the position is valid, false otherwise
      */
     public boolean isValidPosition(PositionImpl position) {
-        PositionImpl last = lastConfirmedEntry;
+        PositionImpl lac = lastConfirmedEntry;
         if (log.isDebugEnabled()) {
-            log.debug("IsValid position: {} -- last: {}", position, last);
+            log.debug("IsValid position: {} -- last: {}", position, lac);
         }
 
-        if (position.getEntryId() < 0) {
+        if (!ledgers.containsKey(position.getLedgerId())){
             return false;
-        } else if (position.getLedgerId() > last.getLedgerId()) {
+        } else if (position.getEntryId() < 0) {
             return false;
-        } else if (position.getLedgerId() == last.getLedgerId()) {
-            return position.getEntryId() <= (last.getEntryId() + 1);
+        } else if (currentLedger != null && position.getLedgerId() == 
currentLedger.getId()) {
+            // If current ledger is empty, the largest read position can be 
"{current_ledger: 0}".
+            // Else, the read position can be set to "{LAC + 1}" when 
subscribe at LATEST,
+            return (position.getLedgerId() == lac.getLedgerId() && 
position.getEntryId() <= lac.getEntryId() + 1)
+                    || position.getEntryId() == 0;
+        } else if (position.getLedgerId() == lac.getLedgerId()) {
+            // The ledger witch maintains LAC was closed, and there is an 
empty current ledger.
+            // If entry id is larger than LAC, it should be "{current_ledger: 
0}".
+            return position.getEntryId() <= lac.getEntryId();
         } else {
             // Look in the ledgers map
             LedgerInfo ls = ledgers.get(position.getLedgerId());
 
             if (ls == null) {
-                if (position.getLedgerId() < last.getLedgerId()) {
+                if (position.getLedgerId() < lac.getLedgerId()) {
                     // Pointing to a non-existing ledger that is older than 
the current ledger is invalid
                     return false;
                 } else {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 51e56158cad..77216ce2e45 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -70,7 +70,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
     private void recoverCursor(PositionImpl mdPosition) {
         Pair<PositionImpl, Long> lastEntryAndCounter = 
ledger.getLastPositionAndCounter();
         this.readPosition = isReadCompacted() ? mdPosition.getNext() : 
ledger.getNextValidPosition(mdPosition);
-        markDeletePosition = mdPosition;
+        markDeletePosition = ledger.getPreviousPosition(this.readPosition);
 
         // Initialize the counter such that the difference between the 
messages written on the ML and the
         // messagesConsumed is equal to the current backlog (negated).
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
index 6762b7d8a04..223e9f47355 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
@@ -27,6 +27,8 @@ import static org.testng.AssertJUnit.assertTrue;
 import java.lang.reflect.Field;
 import java.util.UUID;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -34,6 +36,7 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.BrokerService;
@@ -45,6 +48,8 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.proto.CommandFlow;
+import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
 import org.testng.Assert;
@@ -382,4 +387,215 @@ public class NonDurableSubscriptionTest extends 
ProducerConsumerBase {
         producer.close();
         admin.topics().delete(topicName);
     }
+
+    @Test
+    public void testInitReaderAtSpecifiedPosition() throws Exception {
+        String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, "s0", MessageId.earliest);
+
+        // Trigger 5 ledgers.
+        ArrayList<Long> ledgers = new ArrayList<>();
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        for (int i = 0; i < 5; i++) {
+            MessageIdImpl msgId = (MessageIdImpl) producer.send("1");
+            ledgers.add(msgId.getLedgerId());
+            admin.topics().unload(topicName);
+        }
+        producer.close();
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        LedgerHandle currentLedger = WhiteboxImpl.getInternalState(ml, 
"currentLedger");
+        log.info("currentLedger: {}", currentLedger.getId());
+
+        // Less than the first ledger, and entry id is "-1".
+        log.info("start test s1");
+        String s1 = "s1";
+        MessageIdImpl startMessageId1 = new MessageIdImpl(ledgers.get(0) - 1, 
-1, -1);
+        Reader<String> reader1 = 
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s1)
+                .receiverQueueSize(0).startMessageId(startMessageId1).create();
+        ManagedLedgerInternalStats.CursorStats cursor1 = 
admin.topics().getInternalStats(topicName).cursors.get(s1);
+        log.info("cursor1 readPosition: {}, markDeletedPosition: {}", 
cursor1.readPosition, cursor1.markDeletePosition);
+        PositionImpl p1 = parseReadPosition(cursor1);
+        assertEquals(p1.getLedgerId(), ledgers.get(0));
+        assertEquals(p1.getEntryId(), 0);
+        reader1.close();
+
+        // Less than the first ledger, and entry id is Long.MAX_VALUE.
+        log.info("start test s2");
+        String s2 = "s2";
+        MessageIdImpl startMessageId2 = new MessageIdImpl(ledgers.get(0) - 1, 
Long.MAX_VALUE, -1);
+        Reader<String> reader2 = 
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s2)
+                .receiverQueueSize(0).startMessageId(startMessageId2).create();
+        ManagedLedgerInternalStats.CursorStats cursor2 = 
admin.topics().getInternalStats(topicName).cursors.get(s2);
+        log.info("cursor2 readPosition: {}, markDeletedPosition: {}", 
cursor2.readPosition, cursor2.markDeletePosition);
+        PositionImpl p2 = parseReadPosition(cursor2);
+        assertEquals(p2.getLedgerId(), ledgers.get(0));
+        assertEquals(p2.getEntryId(), 0);
+        reader2.close();
+
+        // Larger than the latest ledger, and entry id is "-1".
+        log.info("start test s3");
+        String s3 = "s3";
+        MessageIdImpl startMessageId3 = new 
MessageIdImpl(currentLedger.getId() + 1, -1, -1);
+        Reader<String> reader3 = 
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s3)
+                .receiverQueueSize(0).startMessageId(startMessageId3).create();
+        ManagedLedgerInternalStats.CursorStats cursor3 = 
admin.topics().getInternalStats(topicName).cursors.get(s3);
+        log.info("cursor3 readPosition: {}, markDeletedPosition: {}", 
cursor3.readPosition, cursor3.markDeletePosition);
+        PositionImpl p3 = parseReadPosition(cursor3);
+        assertEquals(p3.getLedgerId(), currentLedger.getId());
+        assertEquals(p3.getEntryId(), 0);
+        reader3.close();
+
+        // Larger than the latest ledger, and entry id is Long.MAX_VALUE.
+        log.info("start test s4");
+        String s4 = "s4";
+        MessageIdImpl startMessageId4 = new 
MessageIdImpl(currentLedger.getId() + 1, Long.MAX_VALUE, -1);
+        Reader<String> reader4 = 
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s4)
+                .receiverQueueSize(0).startMessageId(startMessageId4).create();
+        ManagedLedgerInternalStats.CursorStats cursor4 = 
admin.topics().getInternalStats(topicName).cursors.get(s4);
+        log.info("cursor4 readPosition: {}, markDeletedPosition: {}", 
cursor4.readPosition, cursor4.markDeletePosition);
+        PositionImpl p4 = parseReadPosition(cursor4);
+        assertEquals(p4.getLedgerId(), currentLedger.getId());
+        assertEquals(p4.getEntryId(), 0);
+        reader4.close();
+
+        // Ledger id and entry id both are Long.MAX_VALUE.
+        log.info("start test s5");
+        String s5 = "s5";
+        MessageIdImpl startMessageId5 = new 
MessageIdImpl(currentLedger.getId() + 1, Long.MAX_VALUE, -1);
+        Reader<String> reader5 = 
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s5)
+                .receiverQueueSize(0).startMessageId(startMessageId5).create();
+        ManagedLedgerInternalStats.CursorStats cursor5 = 
admin.topics().getInternalStats(topicName).cursors.get(s5);
+        log.info("cursor5 readPosition: {}, markDeletedPosition: {}", 
cursor5.readPosition, cursor5.markDeletePosition);
+        PositionImpl p5 = parseReadPosition(cursor5);
+        assertEquals(p5.getLedgerId(), currentLedger.getId());
+        assertEquals(p5.getEntryId(), 0);
+        reader5.close();
+
+        // Ledger id equals LAC, and entry id is "-1".
+        log.info("start test s6");
+        String s6 = "s6";
+        MessageIdImpl startMessageId6 = new 
MessageIdImpl(ledgers.get(ledgers.size() - 1), -1, -1);
+        Reader<String> reader6 = 
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s6)
+                .receiverQueueSize(0).startMessageId(startMessageId6).create();
+        ManagedLedgerInternalStats.CursorStats cursor6 = 
admin.topics().getInternalStats(topicName).cursors.get(s6);
+        log.info("cursor6 readPosition: {}, markDeletedPosition: {}", 
cursor6.readPosition, cursor6.markDeletePosition);
+        PositionImpl p6 = parseReadPosition(cursor6);
+        assertEquals(p6.getLedgerId(), ledgers.get(ledgers.size() - 1));
+        assertEquals(p6.getEntryId(), 0);
+        reader6.close();
+
+        // Larger than the latest ledger, and entry id is Long.MAX_VALUE.
+        log.info("start test s7");
+        String s7 = "s7";
+        MessageIdImpl startMessageId7 = new 
MessageIdImpl(ledgers.get(ledgers.size() - 1), Long.MAX_VALUE, -1);
+        Reader<String> reader7 = 
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s7)
+                .receiverQueueSize(0).startMessageId(startMessageId7).create();
+        ManagedLedgerInternalStats.CursorStats cursor7 = 
admin.topics().getInternalStats(topicName).cursors.get(s7);
+        log.info("cursor7 readPosition: {}, markDeletedPosition: {}", 
cursor7.readPosition, cursor7.markDeletePosition);
+        PositionImpl p7 = parseReadPosition(cursor7);
+        assertEquals(p7.getLedgerId(), currentLedger.getId());
+        assertEquals(p7.getEntryId(), 0);
+        reader7.close();
+
+        // A middle ledger id, and entry id is "-1".
+        log.info("start test s8");
+        String s8 = "s8";
+        MessageIdImpl startMessageId8 = new MessageIdImpl(ledgers.get(2), 0, 
-1);
+        Reader<String> reader8 = 
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s8)
+                .receiverQueueSize(0).startMessageId(startMessageId8).create();
+        ManagedLedgerInternalStats.CursorStats cursor8 = 
admin.topics().getInternalStats(topicName).cursors.get(s8);
+        log.info("cursor8 readPosition: {}, markDeletedPosition: {}", 
cursor8.readPosition, cursor8.markDeletePosition);
+        PositionImpl p8 = parseReadPosition(cursor8);
+        assertEquals(p8.getLedgerId(), ledgers.get(2));
+        assertEquals(p8.getEntryId(), 0);
+        reader8.close();
+
+        // Larger than the latest ledger, and entry id is Long.MAX_VALUE.
+        log.info("start test s9");
+        String s9 = "s9";
+        MessageIdImpl startMessageId9 = new MessageIdImpl(ledgers.get(2), 
Long.MAX_VALUE, -1);
+        Reader<String> reader9 = 
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s9)
+                .receiverQueueSize(0).startMessageId(startMessageId9).create();
+        ManagedLedgerInternalStats.CursorStats cursor9 = 
admin.topics().getInternalStats(topicName).cursors.get(s9);
+        log.info("cursor9 readPosition: {}, markDeletedPosition: {}", 
cursor9.readPosition,
+                cursor9.markDeletePosition);
+        PositionImpl p9 = parseReadPosition(cursor9);
+        assertEquals(p9.getLedgerId(), ledgers.get(3));
+        assertEquals(p9.getEntryId(), 0);
+        reader9.close();
+
+        // Larger than the latest ledger, and entry id equals with the max 
entry id of this ledger.
+        log.info("start test s10");
+        String s10 = "s10";
+        MessageIdImpl startMessageId10 = new MessageIdImpl(ledgers.get(2), 0, 
-1);
+        Reader<String> reader10 = 
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s10)
+                
.receiverQueueSize(0).startMessageId(startMessageId10).create();
+        ManagedLedgerInternalStats.CursorStats cursor10 = 
admin.topics().getInternalStats(topicName).cursors.get(s10);
+        log.info("cursor10 readPosition: {}, markDeletedPosition: {}", 
cursor10.readPosition, cursor10.markDeletePosition);
+        PositionImpl p10 = parseReadPosition(cursor10);
+        assertEquals(p10.getLedgerId(), ledgers.get(2));
+        assertEquals(p10.getEntryId(), 0);
+        reader10.close();
+
+        // cleanup
+        admin.topics().delete(topicName, false);
+    }
+
+    private PositionImpl 
parseReadPosition(ManagedLedgerInternalStats.CursorStats cursorStats) {
+        String[] ledgerIdAndEntryId = cursorStats.readPosition.split(":");
+        return PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]), 
Long.valueOf(ledgerIdAndEntryId[1]));
+    }
+
+    @Test
+    public void testReaderInitAtDeletedPosition() throws Exception {
+        String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(topicName);
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        producer.send("1");
+        producer.send("2");
+        producer.send("3");
+        MessageIdImpl msgIdInDeletedLedger4 = (MessageIdImpl) 
producer.send("4");
+        MessageIdImpl msgIdInDeletedLedger5 = (MessageIdImpl) 
producer.send("5");
+
+        // Trigger a trim ledgers task, and verify trim ledgers successful.
+        admin.topics().unload(topicName);
+        trimLedgers(topicName);
+        List<ManagedLedgerInternalStats.LedgerInfo> ledgers = 
admin.topics().getInternalStats(topicName).ledgers;
+        assertEquals(ledgers.size(), 1);
+        assertNotEquals(ledgers.get(0).ledgerId, 
msgIdInDeletedLedger5.getLedgerId());
+
+        // Start a reader at a deleted ledger.
+        MessageIdImpl startMessageId =
+                new MessageIdImpl(msgIdInDeletedLedger4.getLedgerId(), 
msgIdInDeletedLedger4.getEntryId(), -1);
+        Reader<String> reader = 
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName("s1")
+                .startMessageId(startMessageId).create();
+        Message<String> msg1 = reader.readNext(2, TimeUnit.SECONDS);
+        Assert.assertNull(msg1);
+
+        // Verify backlog and markDeletePosition is correct.
+        Awaitility.await().untilAsserted(() -> {
+            SubscriptionStats subscriptionStats = admin.topics()
+                    .getStats(topicName, true, true, 
true).getSubscriptions().get("s1");
+            log.info("backlog size: {}", subscriptionStats.getMsgBacklog());
+            assertEquals(subscriptionStats.getMsgBacklog(), 0);
+            ManagedLedgerInternalStats.CursorStats cursorStats =
+                    
admin.topics().getInternalStats(topicName).cursors.get("s1");
+            String[] ledgerIdAndEntryId = 
cursorStats.markDeletePosition.split(":");
+            PositionImpl actMarkDeletedPos =
+                    PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]), 
Long.valueOf(ledgerIdAndEntryId[1]));
+            PositionImpl expectedMarkDeletedPos =
+                    PositionImpl.get(msgIdInDeletedLedger5.getLedgerId(), 
msgIdInDeletedLedger5.getEntryId());
+            log.info("Expected mark deleted position: {}", 
expectedMarkDeletedPos);
+            log.info("Actual mark deleted position: {}", 
cursorStats.markDeletePosition);
+            assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 
0);
+        });
+
+        // cleanup.
+        reader.close();
+        producer.close();
+        admin.topics().delete(topicName, false);
+    }
 }

Reply via email to