This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 0e2e571c1e5 [fix][broker] Fix seeking by timestamp can be reset the
cursor position to earliest (#23919)
0e2e571c1e5 is described below
commit 0e2e571c1e5c2a6d9979c3120e62b7554b8f94a4
Author: 道君 <[email protected]>
AuthorDate: Tue Feb 11 14:25:13 2025 +0800
[fix][broker] Fix seeking by timestamp can be reset the cursor position to
earliest (#23919)
(cherry picked from commit 59cf36f217daf8ab7b2665dc6b377c0375c8e87a)
---
.../persistent/PersistentMessageFinder.java | 19 +---
.../service/PersistentMessageFinderTest.java | 6 +-
.../broker/service/SubscriptionSeekTest.java | 118 +++++++++++++++++++++
3 files changed, 123 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
index 5a4631cf205..e780f167284 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.annotations.VisibleForTesting;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -91,6 +92,7 @@ public class PersistentMessageFinder implements
AsyncCallbacks.FindEntryCallback
}
}
+ @VisibleForTesting
public static Pair<Position, Position>
getFindPositionRange(Iterable<LedgerInfo> ledgerInfos,
Position
lastConfirmedEntry, long targetTimestamp,
int
ledgerCloseTimestampMaxClockSkewMillis) {
@@ -105,15 +107,11 @@ public class PersistentMessageFinder implements
AsyncCallbacks.FindEntryCallback
Position start = null;
Position end = null;
- LedgerInfo secondToLastLedgerInfo = null;
- LedgerInfo lastLedgerInfo = null;
for (LedgerInfo info : ledgerInfos) {
if (!info.hasTimestamp()) {
// unexpected case, don't set start and end
return Pair.of(null, null);
}
- secondToLastLedgerInfo = lastLedgerInfo;
- lastLedgerInfo = info;
long closeTimestamp = info.getTimestamp();
// For an open ledger, closeTimestamp is 0
if (closeTimestamp == 0) {
@@ -128,19 +126,6 @@ public class PersistentMessageFinder implements
AsyncCallbacks.FindEntryCallback
break;
}
}
- // If the second-to-last ledger's close timestamp is less than the
target timestamp, then start from the
- // first entry of the last ledger when there are confirmed entries in
the ledger
- if (lastLedgerInfo != null && secondToLastLedgerInfo != null
- && secondToLastLedgerInfo.getTimestamp() > 0
- && secondToLastLedgerInfo.getTimestamp() < targetTimestampMin)
{
- Position firstPositionInLedger =
PositionFactory.create(lastLedgerInfo.getLedgerId(), 0);
- if (lastConfirmedEntry != null
- && lastConfirmedEntry.compareTo(firstPositionInLedger) >=
0) {
- start = firstPositionInLedger;
- } else {
- start = lastConfirmedEntry;
- }
- }
return Pair.of(start, end);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 6f2f1f3a1a2..62e27eaea41 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -636,7 +636,7 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
assertNotNull(range);
assertNotNull(range.getLeft());
assertNull(range.getRight());
- assertEquals(range.getLeft(), PositionFactory.create(3, 0));
+ assertEquals(range.getLeft(), PositionFactory.create(2, 0));
}
@Test
@@ -654,7 +654,7 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
assertNotNull(range);
assertNotNull(range.getLeft());
assertNull(range.getRight());
- assertEquals(range.getLeft(), PositionFactory.create(2, 9));
+ assertEquals(range.getLeft(), PositionFactory.create(2, 0));
}
@Test
@@ -689,7 +689,7 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
assertNotNull(range);
assertNotNull(range.getLeft());
assertNotNull(range.getRight());
- assertEquals(range.getLeft(), PositionFactory.create(3, 0));
+ assertEquals(range.getLeft(), PositionFactory.create(2, 0));
assertEquals(range.getRight(), PositionFactory.create(3, 9));
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 582d10294a5..3a9c5c43f1c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -27,8 +27,10 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -38,6 +40,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -50,6 +58,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
@@ -61,6 +70,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.awaitility.Awaitility;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -72,6 +82,10 @@ public class SubscriptionSeekTest extends BrokerTestBase {
@BeforeClass
@Override
protected void setup() throws Exception {
+ conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+ conf.setManagedLedgerMaxEntriesPerLedger(10);
+ conf.setDefaultRetentionSizeInMB(100);
+ conf.setDefaultRetentionTimeInMinutes(100);
super.baseSetup();
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
}
@@ -489,6 +503,110 @@ public class SubscriptionSeekTest extends BrokerTestBase {
assertEquals(sub.getNumberOfEntriesInBacklog(false), 10);
}
+ @Test(timeOut = 30_000)
+ public void testSeekByTimestamp() throws Exception {
+ String topicName = "persistent://prop/use/ns-abc/testSeekByTimestamp";
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, "my-sub",
MessageId.earliest);
+
+ @Cleanup
+ Producer<String> producer =
+
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+ for (int i = 0; i < 25; i++) {
+ producer.send(("message-" + i));
+ Thread.sleep(10);
+ }
+
+ PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).get().get();
+
+ Map<Long, MessageId> timestampToMessageId = new HashMap<>();
+ @Cleanup
+ Reader<String> reader =
pulsarClient.newReader(Schema.STRING).topic(topicName).startMessageId(MessageId.earliest).create();
+ while (reader.hasMessageAvailable()) {
+ Message<String> message = reader.readNext();
+ timestampToMessageId.put(message.getPublishTime(),
message.getMessageId());
+ }
+
+ Assert.assertEquals(timestampToMessageId.size(), 25);
+
+ PersistentSubscription subscription = topic.getSubscription("my-sub");
+ ManagedCursor cursor = subscription.getCursor();
+
+ @Cleanup
+ org.apache.pulsar.client.api.Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING)
+
.topic(topicName).subscriptionName("my-sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+ long[] timestamps =
timestampToMessageId.keySet().stream().mapToLong(Long::longValue).toArray();
+ ArrayUtils.shuffle(timestamps);
+ for (long timestamp : timestamps) {
+ MessageIdImpl messageId = (MessageIdImpl)
timestampToMessageId.get(timestamp);
+ consumer.seek(timestamp);
+ Position readPosition = cursor.getReadPosition();
+ Assert.assertEquals(readPosition.getLedgerId(),
messageId.getLedgerId());
+ Assert.assertEquals(readPosition.getEntryId(),
messageId.getEntryId());
+ }
+ }
+
+ @Test(timeOut = 30_000)
+ public void testSeekByTimestampWithLedgerTrim() throws Exception {
+ String topicName =
"persistent://prop/use/ns-abc/testSeekByTimestampWithLedgerTrim";
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, "my-sub",
MessageId.earliest);
+
+ @Cleanup
+ Producer<String> producer =
+
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+ for (int i = 0; i < 25; i++) {
+ producer.send(("message-" + i));
+ Thread.sleep(10);
+ }
+
+ PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).get().get();
+ ManagedLedger ledger = topic.getManagedLedger();
+ ManagedLedgerConfig config = ledger.getConfig();
+ config.setRetentionTime(0, TimeUnit.SECONDS);
+ config.setRetentionSizeInMB(0);
+
+ Map<Long, MessageId> timestampToMessageId = new HashMap<>();
+ @Cleanup
+ Reader<String> reader =
pulsarClient.newReader(Schema.STRING).topic(topicName).startMessageId(MessageId.earliest).create();
+ while (reader.hasMessageAvailable()) {
+ Message<String> message = reader.readNext();
+ timestampToMessageId.put(message.getPublishTime(),
message.getMessageId());
+ }
+
+ Assert.assertEquals(timestampToMessageId.size(), 25);
+
+ PersistentSubscription subscription = topic.getSubscription("my-sub");
+ ManagedCursor cursor = subscription.getCursor();
+
+ @Cleanup
+ org.apache.pulsar.client.api.Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING)
+
.topic(topicName).subscriptionName("my-sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+ long[] timestamps =
timestampToMessageId.keySet().stream().mapToLong(Long::longValue).toArray();
+ ArrayUtils.shuffle(timestamps);
+ boolean enterLedgerTrimmedBranch = false;
+ for (long timestamp : timestamps) {
+ MessageIdImpl messageId = (MessageIdImpl)
timestampToMessageId.get(timestamp);
+ consumer.seek(timestamp);
+ CompletableFuture<?> trimFuture = new CompletableFuture<>();
+ ledger.trimConsumedLedgersInBackground(trimFuture);
+ trimFuture.get();
+ Position readPosition = cursor.getReadPosition();
+ Map.Entry<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
firstLedger = ledger.getLedgersInfo().firstEntry();
+ Assert.assertNotNull(firstLedger);
+ if (firstLedger.getKey() > messageId.getLedgerId()) {
+ Assert.assertEquals(readPosition.getLedgerId(),
firstLedger.getKey());
+ Assert.assertEquals(readPosition.getEntryId(), 0);
+ enterLedgerTrimmedBranch = true;
+ } else {
+ Assert.assertEquals(readPosition.getLedgerId(),
messageId.getLedgerId());
+ Assert.assertEquals(readPosition.getEntryId(),
messageId.getEntryId());
+ }
+ }
+ // May have a chance to cause flaky test, because the result of
`ArrayUtils.shuffle(timestamps);` is random.
+ Assert.assertTrue(enterLedgerTrimmedBranch);
+ }
+
@Test
public void testSeekTimeByFunction() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/test" +
UUID.randomUUID();