This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new dd28bb41f44 [fix] [ml] Reader can set read-pos to a deleted ledger
(#21248)
dd28bb41f44 is described below
commit dd28bb41f44de7d88aec39c79cc8014d64ad4476
Author: fengyubiao <[email protected]>
AuthorDate: Sun Oct 8 11:25:02 2023 +0800
[fix] [ml] Reader can set read-pos to a deleted ledger (#21248)
### Motivation
After trimming ledgers, the variable `lastConfirmedEntry` of the managed
ledger might rely on a deleted ledger(the latest ledger which contains data).
There is a bug that makes pulsar allow users to set the start read position
to an unexisting ledger or a deleted ledger when creating a subscription. This
makes the `backlog` and `markDeletedPosition` wrong.
### Modifications
Fix the bug.
(cherry picked from commit 4ee5cd78147890ce3c23092edbfc11370972728e)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 3 +
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 21 +-
.../mledger/impl/NonDurableCursorImpl.java | 2 +-
.../client/api/NonDurableSubscriptionTest.java | 216 +++++++++++++++++++++
4 files changed, 234 insertions(+), 8 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 9063602dd70..3a4d371019c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1107,6 +1107,9 @@ public class ManagedCursorImpl implements ManagedCursor {
messagesConsumedCounter, markDeletePosition, readPosition);
}
if (isPrecise) {
+ if (markDeletePosition.compareTo(ledger.getLastPosition()) >= 0) {
+ return 0;
+ }
return getNumberOfEntries(Range.openClosed(markDeletePosition,
ledger.getLastPosition()));
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0f65a1a089c..d51b48bdda5 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3630,23 +3630,30 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
* @return true if the position is valid, false otherwise
*/
public boolean isValidPosition(PositionImpl position) {
- PositionImpl last = lastConfirmedEntry;
+ PositionImpl lac = lastConfirmedEntry;
if (log.isDebugEnabled()) {
- log.debug("IsValid position: {} -- last: {}", position, last);
+ log.debug("IsValid position: {} -- last: {}", position, lac);
}
- if (position.getEntryId() < 0) {
+ if (!ledgers.containsKey(position.getLedgerId())){
return false;
- } else if (position.getLedgerId() > last.getLedgerId()) {
+ } else if (position.getEntryId() < 0) {
return false;
- } else if (position.getLedgerId() == last.getLedgerId()) {
- return position.getEntryId() <= (last.getEntryId() + 1);
+ } else if (currentLedger != null && position.getLedgerId() ==
currentLedger.getId()) {
+ // If current ledger is empty, the largest read position can be
"{current_ledger: 0}".
+ // Else, the read position can be set to "{LAC + 1}" when
subscribe at LATEST,
+ return (position.getLedgerId() == lac.getLedgerId() &&
position.getEntryId() <= lac.getEntryId() + 1)
+ || position.getEntryId() == 0;
+ } else if (position.getLedgerId() == lac.getLedgerId()) {
+ // The ledger witch maintains LAC was closed, and there is an
empty current ledger.
+ // If entry id is larger than LAC, it should be "{current_ledger:
0}".
+ return position.getEntryId() <= lac.getEntryId();
} else {
// Look in the ledgers map
LedgerInfo ls = ledgers.get(position.getLedgerId());
if (ls == null) {
- if (position.getLedgerId() < last.getLedgerId()) {
+ if (position.getLedgerId() < lac.getLedgerId()) {
// Pointing to a non-existing ledger that is older than
the current ledger is invalid
return false;
} else {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 51e56158cad..77216ce2e45 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -70,7 +70,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
private void recoverCursor(PositionImpl mdPosition) {
Pair<PositionImpl, Long> lastEntryAndCounter =
ledger.getLastPositionAndCounter();
this.readPosition = isReadCompacted() ? mdPosition.getNext() :
ledger.getNextValidPosition(mdPosition);
- markDeletePosition = mdPosition;
+ markDeletePosition = ledger.getPreviousPosition(this.readPosition);
// Initialize the counter such that the difference between the
messages written on the ML and the
// messagesConsumed is equal to the current backlog (negated).
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
index 6762b7d8a04..223e9f47355 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
@@ -27,6 +27,8 @@ import static org.testng.AssertJUnit.assertTrue;
import java.lang.reflect.Field;
import java.util.UUID;
import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -34,6 +36,7 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
@@ -45,6 +48,8 @@ import
org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandFlow;
+import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
@@ -382,4 +387,215 @@ public class NonDurableSubscriptionTest extends
ProducerConsumerBase {
producer.close();
admin.topics().delete(topicName);
}
+
+ @Test
+ public void testInitReaderAtSpecifiedPosition() throws Exception {
+ String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, "s0", MessageId.earliest);
+
+ // Trigger 5 ledgers.
+ ArrayList<Long> ledgers = new ArrayList<>();
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+ for (int i = 0; i < 5; i++) {
+ MessageIdImpl msgId = (MessageIdImpl) producer.send("1");
+ ledgers.add(msgId.getLedgerId());
+ admin.topics().unload(topicName);
+ }
+ producer.close();
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+ LedgerHandle currentLedger = WhiteboxImpl.getInternalState(ml,
"currentLedger");
+ log.info("currentLedger: {}", currentLedger.getId());
+
+ // Less than the first ledger, and entry id is "-1".
+ log.info("start test s1");
+ String s1 = "s1";
+ MessageIdImpl startMessageId1 = new MessageIdImpl(ledgers.get(0) - 1,
-1, -1);
+ Reader<String> reader1 =
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s1)
+ .receiverQueueSize(0).startMessageId(startMessageId1).create();
+ ManagedLedgerInternalStats.CursorStats cursor1 =
admin.topics().getInternalStats(topicName).cursors.get(s1);
+ log.info("cursor1 readPosition: {}, markDeletedPosition: {}",
cursor1.readPosition, cursor1.markDeletePosition);
+ PositionImpl p1 = parseReadPosition(cursor1);
+ assertEquals(p1.getLedgerId(), ledgers.get(0));
+ assertEquals(p1.getEntryId(), 0);
+ reader1.close();
+
+ // Less than the first ledger, and entry id is Long.MAX_VALUE.
+ log.info("start test s2");
+ String s2 = "s2";
+ MessageIdImpl startMessageId2 = new MessageIdImpl(ledgers.get(0) - 1,
Long.MAX_VALUE, -1);
+ Reader<String> reader2 =
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s2)
+ .receiverQueueSize(0).startMessageId(startMessageId2).create();
+ ManagedLedgerInternalStats.CursorStats cursor2 =
admin.topics().getInternalStats(topicName).cursors.get(s2);
+ log.info("cursor2 readPosition: {}, markDeletedPosition: {}",
cursor2.readPosition, cursor2.markDeletePosition);
+ PositionImpl p2 = parseReadPosition(cursor2);
+ assertEquals(p2.getLedgerId(), ledgers.get(0));
+ assertEquals(p2.getEntryId(), 0);
+ reader2.close();
+
+ // Larger than the latest ledger, and entry id is "-1".
+ log.info("start test s3");
+ String s3 = "s3";
+ MessageIdImpl startMessageId3 = new
MessageIdImpl(currentLedger.getId() + 1, -1, -1);
+ Reader<String> reader3 =
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s3)
+ .receiverQueueSize(0).startMessageId(startMessageId3).create();
+ ManagedLedgerInternalStats.CursorStats cursor3 =
admin.topics().getInternalStats(topicName).cursors.get(s3);
+ log.info("cursor3 readPosition: {}, markDeletedPosition: {}",
cursor3.readPosition, cursor3.markDeletePosition);
+ PositionImpl p3 = parseReadPosition(cursor3);
+ assertEquals(p3.getLedgerId(), currentLedger.getId());
+ assertEquals(p3.getEntryId(), 0);
+ reader3.close();
+
+ // Larger than the latest ledger, and entry id is Long.MAX_VALUE.
+ log.info("start test s4");
+ String s4 = "s4";
+ MessageIdImpl startMessageId4 = new
MessageIdImpl(currentLedger.getId() + 1, Long.MAX_VALUE, -1);
+ Reader<String> reader4 =
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s4)
+ .receiverQueueSize(0).startMessageId(startMessageId4).create();
+ ManagedLedgerInternalStats.CursorStats cursor4 =
admin.topics().getInternalStats(topicName).cursors.get(s4);
+ log.info("cursor4 readPosition: {}, markDeletedPosition: {}",
cursor4.readPosition, cursor4.markDeletePosition);
+ PositionImpl p4 = parseReadPosition(cursor4);
+ assertEquals(p4.getLedgerId(), currentLedger.getId());
+ assertEquals(p4.getEntryId(), 0);
+ reader4.close();
+
+ // Ledger id and entry id both are Long.MAX_VALUE.
+ log.info("start test s5");
+ String s5 = "s5";
+ MessageIdImpl startMessageId5 = new
MessageIdImpl(currentLedger.getId() + 1, Long.MAX_VALUE, -1);
+ Reader<String> reader5 =
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s5)
+ .receiverQueueSize(0).startMessageId(startMessageId5).create();
+ ManagedLedgerInternalStats.CursorStats cursor5 =
admin.topics().getInternalStats(topicName).cursors.get(s5);
+ log.info("cursor5 readPosition: {}, markDeletedPosition: {}",
cursor5.readPosition, cursor5.markDeletePosition);
+ PositionImpl p5 = parseReadPosition(cursor5);
+ assertEquals(p5.getLedgerId(), currentLedger.getId());
+ assertEquals(p5.getEntryId(), 0);
+ reader5.close();
+
+ // Ledger id equals LAC, and entry id is "-1".
+ log.info("start test s6");
+ String s6 = "s6";
+ MessageIdImpl startMessageId6 = new
MessageIdImpl(ledgers.get(ledgers.size() - 1), -1, -1);
+ Reader<String> reader6 =
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s6)
+ .receiverQueueSize(0).startMessageId(startMessageId6).create();
+ ManagedLedgerInternalStats.CursorStats cursor6 =
admin.topics().getInternalStats(topicName).cursors.get(s6);
+ log.info("cursor6 readPosition: {}, markDeletedPosition: {}",
cursor6.readPosition, cursor6.markDeletePosition);
+ PositionImpl p6 = parseReadPosition(cursor6);
+ assertEquals(p6.getLedgerId(), ledgers.get(ledgers.size() - 1));
+ assertEquals(p6.getEntryId(), 0);
+ reader6.close();
+
+ // Larger than the latest ledger, and entry id is Long.MAX_VALUE.
+ log.info("start test s7");
+ String s7 = "s7";
+ MessageIdImpl startMessageId7 = new
MessageIdImpl(ledgers.get(ledgers.size() - 1), Long.MAX_VALUE, -1);
+ Reader<String> reader7 =
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s7)
+ .receiverQueueSize(0).startMessageId(startMessageId7).create();
+ ManagedLedgerInternalStats.CursorStats cursor7 =
admin.topics().getInternalStats(topicName).cursors.get(s7);
+ log.info("cursor7 readPosition: {}, markDeletedPosition: {}",
cursor7.readPosition, cursor7.markDeletePosition);
+ PositionImpl p7 = parseReadPosition(cursor7);
+ assertEquals(p7.getLedgerId(), currentLedger.getId());
+ assertEquals(p7.getEntryId(), 0);
+ reader7.close();
+
+ // A middle ledger id, and entry id is "-1".
+ log.info("start test s8");
+ String s8 = "s8";
+ MessageIdImpl startMessageId8 = new MessageIdImpl(ledgers.get(2), 0,
-1);
+ Reader<String> reader8 =
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s8)
+ .receiverQueueSize(0).startMessageId(startMessageId8).create();
+ ManagedLedgerInternalStats.CursorStats cursor8 =
admin.topics().getInternalStats(topicName).cursors.get(s8);
+ log.info("cursor8 readPosition: {}, markDeletedPosition: {}",
cursor8.readPosition, cursor8.markDeletePosition);
+ PositionImpl p8 = parseReadPosition(cursor8);
+ assertEquals(p8.getLedgerId(), ledgers.get(2));
+ assertEquals(p8.getEntryId(), 0);
+ reader8.close();
+
+ // Larger than the latest ledger, and entry id is Long.MAX_VALUE.
+ log.info("start test s9");
+ String s9 = "s9";
+ MessageIdImpl startMessageId9 = new MessageIdImpl(ledgers.get(2),
Long.MAX_VALUE, -1);
+ Reader<String> reader9 =
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s9)
+ .receiverQueueSize(0).startMessageId(startMessageId9).create();
+ ManagedLedgerInternalStats.CursorStats cursor9 =
admin.topics().getInternalStats(topicName).cursors.get(s9);
+ log.info("cursor9 readPosition: {}, markDeletedPosition: {}",
cursor9.readPosition,
+ cursor9.markDeletePosition);
+ PositionImpl p9 = parseReadPosition(cursor9);
+ assertEquals(p9.getLedgerId(), ledgers.get(3));
+ assertEquals(p9.getEntryId(), 0);
+ reader9.close();
+
+ // Larger than the latest ledger, and entry id equals with the max
entry id of this ledger.
+ log.info("start test s10");
+ String s10 = "s10";
+ MessageIdImpl startMessageId10 = new MessageIdImpl(ledgers.get(2), 0,
-1);
+ Reader<String> reader10 =
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s10)
+
.receiverQueueSize(0).startMessageId(startMessageId10).create();
+ ManagedLedgerInternalStats.CursorStats cursor10 =
admin.topics().getInternalStats(topicName).cursors.get(s10);
+ log.info("cursor10 readPosition: {}, markDeletedPosition: {}",
cursor10.readPosition, cursor10.markDeletePosition);
+ PositionImpl p10 = parseReadPosition(cursor10);
+ assertEquals(p10.getLedgerId(), ledgers.get(2));
+ assertEquals(p10.getEntryId(), 0);
+ reader10.close();
+
+ // cleanup
+ admin.topics().delete(topicName, false);
+ }
+
+ private PositionImpl
parseReadPosition(ManagedLedgerInternalStats.CursorStats cursorStats) {
+ String[] ledgerIdAndEntryId = cursorStats.readPosition.split(":");
+ return PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]),
Long.valueOf(ledgerIdAndEntryId[1]));
+ }
+
+ @Test
+ public void testReaderInitAtDeletedPosition() throws Exception {
+ String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createNonPartitionedTopic(topicName);
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+ producer.send("1");
+ producer.send("2");
+ producer.send("3");
+ MessageIdImpl msgIdInDeletedLedger4 = (MessageIdImpl)
producer.send("4");
+ MessageIdImpl msgIdInDeletedLedger5 = (MessageIdImpl)
producer.send("5");
+
+ // Trigger a trim ledgers task, and verify trim ledgers successful.
+ admin.topics().unload(topicName);
+ trimLedgers(topicName);
+ List<ManagedLedgerInternalStats.LedgerInfo> ledgers =
admin.topics().getInternalStats(topicName).ledgers;
+ assertEquals(ledgers.size(), 1);
+ assertNotEquals(ledgers.get(0).ledgerId,
msgIdInDeletedLedger5.getLedgerId());
+
+ // Start a reader at a deleted ledger.
+ MessageIdImpl startMessageId =
+ new MessageIdImpl(msgIdInDeletedLedger4.getLedgerId(),
msgIdInDeletedLedger4.getEntryId(), -1);
+ Reader<String> reader =
pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName("s1")
+ .startMessageId(startMessageId).create();
+ Message<String> msg1 = reader.readNext(2, TimeUnit.SECONDS);
+ Assert.assertNull(msg1);
+
+ // Verify backlog and markDeletePosition is correct.
+ Awaitility.await().untilAsserted(() -> {
+ SubscriptionStats subscriptionStats = admin.topics()
+ .getStats(topicName, true, true,
true).getSubscriptions().get("s1");
+ log.info("backlog size: {}", subscriptionStats.getMsgBacklog());
+ assertEquals(subscriptionStats.getMsgBacklog(), 0);
+ ManagedLedgerInternalStats.CursorStats cursorStats =
+
admin.topics().getInternalStats(topicName).cursors.get("s1");
+ String[] ledgerIdAndEntryId =
cursorStats.markDeletePosition.split(":");
+ PositionImpl actMarkDeletedPos =
+ PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]),
Long.valueOf(ledgerIdAndEntryId[1]));
+ PositionImpl expectedMarkDeletedPos =
+ PositionImpl.get(msgIdInDeletedLedger5.getLedgerId(),
msgIdInDeletedLedger5.getEntryId());
+ log.info("Expected mark deleted position: {}",
expectedMarkDeletedPos);
+ log.info("Actual mark deleted position: {}",
cursorStats.markDeletePosition);
+ assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >=
0);
+ });
+
+ // cleanup.
+ reader.close();
+ producer.close();
+ admin.topics().delete(topicName, false);
+ }
}