This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cb12a57d6f [fix][broker] Fix issue with GetMessageIdByTimestamp can't 
find match messageId from compacted ledger (#21600)
5cb12a57d6f is described below

commit 5cb12a57d6fa9ee2a611c5b1ef7db780fb57ade5
Author: Cong Zhao <[email protected]>
AuthorDate: Sat Jan 20 09:35:56 2024 +0800

    [fix][broker] Fix issue with GetMessageIdByTimestamp can't find match 
messageId from compacted ledger (#21600)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 60 ++++++++++++++-----
 .../pulsar/compaction/CompactedTopicImpl.java      | 50 ++++++++++++++++
 .../compaction/PulsarTopicCompactionService.java   | 56 +-----------------
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 67 ++++++++++++++++++++++
 4 files changed, 166 insertions(+), 67 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4ae765a0caa..b7a654cc0a4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2885,28 +2885,62 @@ public class PersistentTopicsBase extends AdminResource 
{
                     throw new RestException(Status.METHOD_NOT_ALLOWED,
                         "Get message ID by timestamp on a non-persistent topic 
is not allowed");
                 }
-                ManagedLedger ledger = ((PersistentTopic) 
topic).getManagedLedger();
-                return ledger.asyncFindPosition(entry -> {
+                final PersistentTopic persistentTopic = (PersistentTopic) 
topic;
+
+                return 
persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenCompose(lastEntry
 -> {
+                    if (lastEntry == null) {
+                        return findMessageIdByPublishTime(timestamp, 
persistentTopic.getManagedLedger());
+                    }
+                    MessageMetadata metadata;
+                    Position position = lastEntry.getPosition();
                     try {
-                        long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
-                        return 
MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
-                    } catch (Exception e) {
-                        log.error("[{}] Error deserializing message for 
message position find", topicName, e);
+                        metadata = 
Commands.parseMessageMetadata(lastEntry.getDataBuffer());
                     } finally {
-                        entry.release();
+                        lastEntry.release();
                     }
-                    return false;
-                }).thenApply(position -> {
-                    if (position == null) {
-                        return null;
+                    if (timestamp == metadata.getPublishTime()) {
+                        return CompletableFuture.completedFuture(new 
MessageIdImpl(position.getLedgerId(),
+                                position.getEntryId(), 
topicName.getPartitionIndex()));
+                    } else if (timestamp < metadata.getPublishTime()) {
+                        return 
persistentTopic.getTopicCompactionService().findEntryByPublishTime(timestamp)
+                                .thenApply(compactedEntry -> {
+                                    try {
+                                        return new 
MessageIdImpl(compactedEntry.getLedgerId(),
+                                                compactedEntry.getEntryId(), 
topicName.getPartitionIndex());
+                                    } finally {
+                                        compactedEntry.release();
+                                    }
+                                });
                     } else {
-                        return new MessageIdImpl(position.getLedgerId(), 
position.getEntryId(),
-                            topicName.getPartitionIndex());
+                        return findMessageIdByPublishTime(timestamp, 
persistentTopic.getManagedLedger());
                     }
                 });
             });
     }
 
+    private CompletableFuture<MessageId> findMessageIdByPublishTime(long 
timestamp, ManagedLedger managedLedger) {
+        return managedLedger.asyncFindPosition(entry -> {
+            try {
+                long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
+                return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, 
timestamp);
+            } catch (Exception e) {
+                log.error("[{}] Error deserializing message for message 
position find",
+                    topicName,
+                    e);
+            } finally {
+                entry.release();
+            }
+            return false;
+        }).thenApply(position -> {
+            if (position == null) {
+                return null;
+            } else {
+                return new MessageIdImpl(position.getLedgerId(), 
position.getEntryId(),
+                    topicName.getPartitionIndex());
+            }
+        });
+    }
+
     protected CompletableFuture<Response> internalPeekNthMessageAsync(String 
subName, int messagePosition,
                                                                       boolean 
authoritative) {
         CompletableFuture<Void> ret;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index a8e124c84a2..d13ce61753d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -32,6 +32,7 @@ import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Predicate;
 import javax.annotation.Nullable;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -320,6 +321,55 @@ public class CompactedTopicImpl implements CompactedTopic {
         });
     }
 
+    CompletableFuture<Entry> findFirstMatchEntry(final Predicate<Entry> 
predicate) {
+        var compactedTopicContextFuture = 
this.getCompactedTopicContextFuture();
+
+        if (compactedTopicContextFuture == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return compactedTopicContextFuture.thenCompose(compactedTopicContext 
-> {
+            LedgerHandle lh = compactedTopicContext.getLedger();
+            CompletableFuture<Long> promise = new CompletableFuture<>();
+            findFirstMatchIndexLoop(predicate, 0L, lh.getLastAddConfirmed(), 
promise, null, lh);
+            return promise.thenCompose(index -> {
+                if (index == null) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                return readEntries(lh, index, index).thenApply(entries -> 
entries.get(0));
+            });
+        });
+    }
+    private static void findFirstMatchIndexLoop(final Predicate<Entry> 
predicate,
+                                                final long start, final long 
end,
+                                                final CompletableFuture<Long> 
promise,
+                                                final Long lastMatchIndex,
+                                                final LedgerHandle lh) {
+        if (start > end) {
+            promise.complete(lastMatchIndex);
+            return;
+        }
+
+        long mid = (start + end) / 2;
+        readEntries(lh, mid, mid).thenAccept(entries -> {
+            Entry entry = entries.get(0);
+            final boolean isMatch;
+            try {
+                isMatch = predicate.test(entry);
+            } finally {
+                entry.release();
+            }
+
+            if (isMatch) {
+                findFirstMatchIndexLoop(predicate, start, mid - 1, promise, 
mid, lh);
+            } else {
+                findFirstMatchIndexLoop(predicate, mid + 1, end, promise, 
lastMatchIndex, lh);
+            }
+        }).exceptionally(ex -> {
+            promise.completeExceptionally(ex);
+            return null;
+        });
+    }
+
     private static int comparePositionAndMessageId(PositionImpl p, 
MessageIdData m) {
         return ComparisonChain.start()
             .compare(p.getLedgerId(), m.getLedgerId())
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
index 1d3f94dcb90..16543bc7aa7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
@@ -22,7 +22,6 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.pulsar.compaction.CompactedTopicImpl.COMPACT_LEDGER_EMPTY;
 import static 
org.apache.pulsar.compaction.CompactedTopicImpl.NEWER_THAN_COMPACTED;
 import static org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint;
-import static org.apache.pulsar.compaction.CompactedTopicImpl.readEntries;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
@@ -33,7 +32,6 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 import javax.annotation.Nonnull;
 import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -116,7 +114,7 @@ public class PulsarTopicCompactionService implements 
TopicCompactionService {
         final Predicate<Entry> predicate = entry -> {
             return 
Commands.parseMessageMetadata(entry.getDataBuffer()).getPublishTime() >= 
publishTime;
         };
-        return findFirstMatchEntry(predicate);
+        return compactedTopic.findFirstMatchEntry(predicate);
     }
 
     @Override
@@ -128,57 +126,7 @@ public class PulsarTopicCompactionService implements 
TopicCompactionService {
             }
             return brokerEntryMetadata.getIndex() >= entryIndex;
         };
-        return findFirstMatchEntry(predicate);
-    }
-
-    private CompletableFuture<Entry> findFirstMatchEntry(final 
Predicate<Entry> predicate) {
-        var compactedTopicContextFuture = 
compactedTopic.getCompactedTopicContextFuture();
-
-        if (compactedTopicContextFuture == null) {
-            return CompletableFuture.completedFuture(null);
-        }
-        return compactedTopicContextFuture.thenCompose(compactedTopicContext 
-> {
-            LedgerHandle lh = compactedTopicContext.getLedger();
-            CompletableFuture<Long> promise = new CompletableFuture<>();
-            findFirstMatchIndexLoop(predicate, 0L, lh.getLastAddConfirmed(), 
promise, null, lh);
-            return promise.thenCompose(index -> {
-                if (index == null) {
-                    return CompletableFuture.completedFuture(null);
-                }
-                return readEntries(lh, index, index).thenApply(entries -> 
entries.get(0));
-            });
-        });
-    }
-
-    private static void findFirstMatchIndexLoop(final Predicate<Entry> 
predicate,
-                                           final long start, final long end,
-                                           final CompletableFuture<Long> 
promise,
-                                           final Long lastMatchIndex,
-                                           final LedgerHandle lh) {
-        if (start > end) {
-            promise.complete(lastMatchIndex);
-            return;
-        }
-
-        long mid = (start + end) / 2;
-        readEntries(lh, mid, mid).thenAccept(entries -> {
-            Entry entry = entries.get(0);
-            final boolean isMatch;
-            try {
-                isMatch = predicate.test(entry);
-            } finally {
-                entry.release();
-            }
-
-            if (isMatch) {
-                findFirstMatchIndexLoop(predicate, start, mid - 1, promise, 
mid, lh);
-            } else {
-                findFirstMatchIndexLoop(predicate, mid + 1, end, promise, 
lastMatchIndex, lh);
-            }
-        }).exceptionally(ex -> {
-            promise.completeExceptionally(ex);
-            return null;
-        });
+        return compactedTopic.findFirstMatchEntry(predicate);
     }
 
     public CompactedTopicImpl getCompactedTopic() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 888d314147a..23cb413614f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -31,6 +31,8 @@ import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -65,6 +67,7 @@ import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Topics;
@@ -87,6 +90,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -1450,6 +1454,69 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
                 .compareTo(id2) > 0);
     }
 
+    @Test
+    public void testGetMessageIdByTimestampWithCompaction() throws Exception {
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", 
"role2"), Set.of("test"));
+        admin.tenants().createTenant("tenant-xyz", tenantInfo);
+        admin.namespaces().createNamespace("tenant-xyz/ns-abc", 
Set.of("test"));
+        final String topicName = 
"persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestampWithCompaction";
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        Map<MessageId, Long> publishTimeMap = new ConcurrentHashMap<>();
+        @Cleanup
+        ProducerBase<byte[]> producer = (ProducerBase<byte[]>) 
pulsarClient.newProducer().topic(topicName)
+                .enableBatching(false)
+                .intercept(new ProducerInterceptor() {
+                    @Override
+                    public void close() {
+
+                    }
+
+                    @Override
+                    public boolean eligible(Message message) {
+                        return true;
+                    }
+
+                    @Override
+                    public Message beforeSend(Producer producer, Message 
message) {
+                        return message;
+                    }
+
+                    @Override
+                    public void onSendAcknowledgement(Producer producer, 
Message message, MessageId msgId,
+                                                      Throwable exception) {
+                        publishTimeMap.put(message.getMessageId(), 
message.getPublishTime());
+                    }
+                })
+                .create();
+
+        MessageId id1 = 
producer.newMessage().key("K1").value("test1".getBytes()).send();
+        MessageId id2 = 
producer.newMessage().key("K2").value("test2".getBytes()).send();
+
+        long publish1 = publishTimeMap.get(id1);
+        long publish2 = publishTimeMap.get(id2);
+        Assert.assertTrue(publish1 < publish2);
+
+        admin.topics().triggerCompaction(topicName);
+        Awaitility.await().untilAsserted(() ->
+            assertSame(admin.topics().compactionStatus(topicName).status,
+                LongRunningProcessStatus.Status.SUCCESS));
+
+        admin.topics().unload(topicName);
+        Awaitility.await().untilAsserted(() -> {
+                PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(topicName, false);
+                assertEquals(internalStats.ledgers.size(), 1);
+                assertEquals(internalStats.ledgers.get(0).entries, 0);
+        });
+
+        Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, 
publish1 - 1), id1);
+        Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, 
publish1), id1);
+        Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, 
publish1 + 1), id2);
+        Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, 
publish2), id2);
+        Assert.assertTrue(admin.topics().getMessageIdByTimestamp(topicName, 
publish2 + 1)
+                .compareTo(id2) > 0);
+    }
+
     @Test
     public void testGetBatchMessageIdByTimestamp() throws Exception {
         TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", 
"role2"), Set.of("test"));

Reply via email to