This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5cb12a57d6f [fix][broker] Fix issue with GetMessageIdByTimestamp can't
find match messageId from compacted ledger (#21600)
5cb12a57d6f is described below
commit 5cb12a57d6fa9ee2a611c5b1ef7db780fb57ade5
Author: Cong Zhao <[email protected]>
AuthorDate: Sat Jan 20 09:35:56 2024 +0800
[fix][broker] Fix issue with GetMessageIdByTimestamp can't find match
messageId from compacted ledger (#21600)
---
.../broker/admin/impl/PersistentTopicsBase.java | 60 ++++++++++++++-----
.../pulsar/compaction/CompactedTopicImpl.java | 50 ++++++++++++++++
.../compaction/PulsarTopicCompactionService.java | 56 +-----------------
.../pulsar/broker/admin/PersistentTopicsTest.java | 67 ++++++++++++++++++++++
4 files changed, 166 insertions(+), 67 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4ae765a0caa..b7a654cc0a4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2885,28 +2885,62 @@ public class PersistentTopicsBase extends AdminResource
{
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Get message ID by timestamp on a non-persistent topic
is not allowed");
}
- ManagedLedger ledger = ((PersistentTopic)
topic).getManagedLedger();
- return ledger.asyncFindPosition(entry -> {
+ final PersistentTopic persistentTopic = (PersistentTopic)
topic;
+
+ return
persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenCompose(lastEntry
-> {
+ if (lastEntry == null) {
+ return findMessageIdByPublishTime(timestamp,
persistentTopic.getManagedLedger());
+ }
+ MessageMetadata metadata;
+ Position position = lastEntry.getPosition();
try {
- long entryTimestamp =
Commands.getEntryTimestamp(entry.getDataBuffer());
- return
MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
- } catch (Exception e) {
- log.error("[{}] Error deserializing message for
message position find", topicName, e);
+ metadata =
Commands.parseMessageMetadata(lastEntry.getDataBuffer());
} finally {
- entry.release();
+ lastEntry.release();
}
- return false;
- }).thenApply(position -> {
- if (position == null) {
- return null;
+ if (timestamp == metadata.getPublishTime()) {
+ return CompletableFuture.completedFuture(new
MessageIdImpl(position.getLedgerId(),
+ position.getEntryId(),
topicName.getPartitionIndex()));
+ } else if (timestamp < metadata.getPublishTime()) {
+ return
persistentTopic.getTopicCompactionService().findEntryByPublishTime(timestamp)
+ .thenApply(compactedEntry -> {
+ try {
+ return new
MessageIdImpl(compactedEntry.getLedgerId(),
+ compactedEntry.getEntryId(),
topicName.getPartitionIndex());
+ } finally {
+ compactedEntry.release();
+ }
+ });
} else {
- return new MessageIdImpl(position.getLedgerId(),
position.getEntryId(),
- topicName.getPartitionIndex());
+ return findMessageIdByPublishTime(timestamp,
persistentTopic.getManagedLedger());
}
});
});
}
+ private CompletableFuture<MessageId> findMessageIdByPublishTime(long
timestamp, ManagedLedger managedLedger) {
+ return managedLedger.asyncFindPosition(entry -> {
+ try {
+ long entryTimestamp =
Commands.getEntryTimestamp(entry.getDataBuffer());
+ return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp,
timestamp);
+ } catch (Exception e) {
+ log.error("[{}] Error deserializing message for message
position find",
+ topicName,
+ e);
+ } finally {
+ entry.release();
+ }
+ return false;
+ }).thenApply(position -> {
+ if (position == null) {
+ return null;
+ } else {
+ return new MessageIdImpl(position.getLedgerId(),
position.getEntryId(),
+ topicName.getPartitionIndex());
+ }
+ });
+ }
+
protected CompletableFuture<Response> internalPeekNthMessageAsync(String
subName, int messagePosition,
boolean
authoritative) {
CompletableFuture<Void> ret;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index a8e124c84a2..d13ce61753d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -32,6 +32,7 @@ import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -320,6 +321,55 @@ public class CompactedTopicImpl implements CompactedTopic {
});
}
+ CompletableFuture<Entry> findFirstMatchEntry(final Predicate<Entry>
predicate) {
+ var compactedTopicContextFuture =
this.getCompactedTopicContextFuture();
+
+ if (compactedTopicContextFuture == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return compactedTopicContextFuture.thenCompose(compactedTopicContext
-> {
+ LedgerHandle lh = compactedTopicContext.getLedger();
+ CompletableFuture<Long> promise = new CompletableFuture<>();
+ findFirstMatchIndexLoop(predicate, 0L, lh.getLastAddConfirmed(),
promise, null, lh);
+ return promise.thenCompose(index -> {
+ if (index == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return readEntries(lh, index, index).thenApply(entries ->
entries.get(0));
+ });
+ });
+ }
+ private static void findFirstMatchIndexLoop(final Predicate<Entry>
predicate,
+ final long start, final long
end,
+ final CompletableFuture<Long>
promise,
+ final Long lastMatchIndex,
+ final LedgerHandle lh) {
+ if (start > end) {
+ promise.complete(lastMatchIndex);
+ return;
+ }
+
+ long mid = (start + end) / 2;
+ readEntries(lh, mid, mid).thenAccept(entries -> {
+ Entry entry = entries.get(0);
+ final boolean isMatch;
+ try {
+ isMatch = predicate.test(entry);
+ } finally {
+ entry.release();
+ }
+
+ if (isMatch) {
+ findFirstMatchIndexLoop(predicate, start, mid - 1, promise,
mid, lh);
+ } else {
+ findFirstMatchIndexLoop(predicate, mid + 1, end, promise,
lastMatchIndex, lh);
+ }
+ }).exceptionally(ex -> {
+ promise.completeExceptionally(ex);
+ return null;
+ });
+ }
+
private static int comparePositionAndMessageId(PositionImpl p,
MessageIdData m) {
return ComparisonChain.start()
.compare(p.getLedgerId(), m.getLedgerId())
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
index 1d3f94dcb90..16543bc7aa7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
@@ -22,7 +22,6 @@ import static
com.google.common.base.Preconditions.checkArgument;
import static
org.apache.pulsar.compaction.CompactedTopicImpl.COMPACT_LEDGER_EMPTY;
import static
org.apache.pulsar.compaction.CompactedTopicImpl.NEWER_THAN_COMPACTED;
import static org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint;
-import static org.apache.pulsar.compaction.CompactedTopicImpl.readEntries;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@@ -33,7 +32,6 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -116,7 +114,7 @@ public class PulsarTopicCompactionService implements
TopicCompactionService {
final Predicate<Entry> predicate = entry -> {
return
Commands.parseMessageMetadata(entry.getDataBuffer()).getPublishTime() >=
publishTime;
};
- return findFirstMatchEntry(predicate);
+ return compactedTopic.findFirstMatchEntry(predicate);
}
@Override
@@ -128,57 +126,7 @@ public class PulsarTopicCompactionService implements
TopicCompactionService {
}
return brokerEntryMetadata.getIndex() >= entryIndex;
};
- return findFirstMatchEntry(predicate);
- }
-
- private CompletableFuture<Entry> findFirstMatchEntry(final
Predicate<Entry> predicate) {
- var compactedTopicContextFuture =
compactedTopic.getCompactedTopicContextFuture();
-
- if (compactedTopicContextFuture == null) {
- return CompletableFuture.completedFuture(null);
- }
- return compactedTopicContextFuture.thenCompose(compactedTopicContext
-> {
- LedgerHandle lh = compactedTopicContext.getLedger();
- CompletableFuture<Long> promise = new CompletableFuture<>();
- findFirstMatchIndexLoop(predicate, 0L, lh.getLastAddConfirmed(),
promise, null, lh);
- return promise.thenCompose(index -> {
- if (index == null) {
- return CompletableFuture.completedFuture(null);
- }
- return readEntries(lh, index, index).thenApply(entries ->
entries.get(0));
- });
- });
- }
-
- private static void findFirstMatchIndexLoop(final Predicate<Entry>
predicate,
- final long start, final long end,
- final CompletableFuture<Long>
promise,
- final Long lastMatchIndex,
- final LedgerHandle lh) {
- if (start > end) {
- promise.complete(lastMatchIndex);
- return;
- }
-
- long mid = (start + end) / 2;
- readEntries(lh, mid, mid).thenAccept(entries -> {
- Entry entry = entries.get(0);
- final boolean isMatch;
- try {
- isMatch = predicate.test(entry);
- } finally {
- entry.release();
- }
-
- if (isMatch) {
- findFirstMatchIndexLoop(predicate, start, mid - 1, promise,
mid, lh);
- } else {
- findFirstMatchIndexLoop(predicate, mid + 1, end, promise,
lastMatchIndex, lh);
- }
- }).exceptionally(ex -> {
- promise.completeExceptionally(ex);
- return null;
- });
+ return compactedTopic.findFirstMatchEntry(predicate);
}
public CompactedTopicImpl getCompactedTopic() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 888d314147a..23cb413614f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -31,6 +31,8 @@ import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.util.ArrayList;
@@ -65,6 +67,7 @@ import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
@@ -87,6 +90,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -1450,6 +1454,69 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
.compareTo(id2) > 0);
}
+ @Test
+ public void testGetMessageIdByTimestampWithCompaction() throws Exception {
+ TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1",
"role2"), Set.of("test"));
+ admin.tenants().createTenant("tenant-xyz", tenantInfo);
+ admin.namespaces().createNamespace("tenant-xyz/ns-abc",
Set.of("test"));
+ final String topicName =
"persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestampWithCompaction";
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ Map<MessageId, Long> publishTimeMap = new ConcurrentHashMap<>();
+ @Cleanup
+ ProducerBase<byte[]> producer = (ProducerBase<byte[]>)
pulsarClient.newProducer().topic(topicName)
+ .enableBatching(false)
+ .intercept(new ProducerInterceptor() {
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public boolean eligible(Message message) {
+ return true;
+ }
+
+ @Override
+ public Message beforeSend(Producer producer, Message
message) {
+ return message;
+ }
+
+ @Override
+ public void onSendAcknowledgement(Producer producer,
Message message, MessageId msgId,
+ Throwable exception) {
+ publishTimeMap.put(message.getMessageId(),
message.getPublishTime());
+ }
+ })
+ .create();
+
+ MessageId id1 =
producer.newMessage().key("K1").value("test1".getBytes()).send();
+ MessageId id2 =
producer.newMessage().key("K2").value("test2".getBytes()).send();
+
+ long publish1 = publishTimeMap.get(id1);
+ long publish2 = publishTimeMap.get(id2);
+ Assert.assertTrue(publish1 < publish2);
+
+ admin.topics().triggerCompaction(topicName);
+ Awaitility.await().untilAsserted(() ->
+ assertSame(admin.topics().compactionStatus(topicName).status,
+ LongRunningProcessStatus.Status.SUCCESS));
+
+ admin.topics().unload(topicName);
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topicName, false);
+ assertEquals(internalStats.ledgers.size(), 1);
+ assertEquals(internalStats.ledgers.get(0).entries, 0);
+ });
+
+ Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName,
publish1 - 1), id1);
+ Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName,
publish1), id1);
+ Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName,
publish1 + 1), id2);
+ Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName,
publish2), id2);
+ Assert.assertTrue(admin.topics().getMessageIdByTimestamp(topicName,
publish2 + 1)
+ .compareTo(id2) > 0);
+ }
+
@Test
public void testGetBatchMessageIdByTimestamp() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1",
"role2"), Set.of("test"));