This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 29533f1c45015c177ec8d441189bf42bdeddd8e0
Author: ran <[email protected]>
AuthorDate: Mon Dec 27 09:45:38 2021 +0800

    Fix getting last message id from empty compact ledger (#13476)
    
    ### Motivation
    
    Currently, if the last confirmed entry is an empty ledger, getting the last 
message-id operation will get data from the compact ledger, if the compact 
ledger is also an empty ledger, it will encounter `IncorrectParameterException`.
    
    **Broker error message**
    ```
    [pulsar-io-29-9] ERROR org.apache.bookkeeper.client.LedgerHandle - 
IncorrectParameterException on ledgerId:617 firstEntry:-1 lastEntry:-1
    ```
    
    **Client error log**
    ```
    Exception in thread "main" 
org.apache.pulsar.client.api.PulsarClientException$BrokerMetadataException: The 
subscription reader-bf9246cfcb of the topic persistent://public/ns-test/t1 gets 
the last message id was failed
    {"errorMsg":"Failed to read last entry of the compacted Ledger Incorrect 
parameter input","reqId":79405902881798690, 
"remote":"localhost/127.0.0.1:6650", "local":"/127.0.0.1:55207"}
        at 
org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1034)
        at 
org.apache.pulsar.client.impl.ConsumerImpl.hasMessageAvailable(ConsumerImpl.java:2001)
        at 
org.apache.pulsar.client.impl.ReaderImpl.hasMessageAvailable(ReaderImpl.java:181)
        at 
org.apache.pulsar.compaction.CompactedTopicTest.main(CompactedTopicTest.java:730)
    ```
    
    ### Modifications
    
    Check the compact ledger entry id before reading an entry from the compact 
ledger, if there is no entry, return a null value.
    
    (cherry picked from commit 8136762e743fe1bc4be2adeb08631a8c44719d37)
    (cherry picked from commit 472f02d1799eb18ee5a32308f22c570f70ec1073)
---
 .../pulsar/compaction/CompactedTopicImpl.java      | 15 ++++++---
 .../pulsar/compaction/CompactedTopicTest.java      | 38 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 4577540..a6d6fc9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -293,11 +293,16 @@ public class CompactedTopicImpl implements CompactedTopic 
{
         if (compactionHorizon == null) {
             return CompletableFuture.completedFuture(null);
         }
-        return compactedTopicContext.thenCompose(context ->
-                readEntries(context.ledger, 
context.ledger.getLastAddConfirmed(), context.ledger.getLastAddConfirmed())
-                        .thenCompose(entries -> entries.size() > 0
-                                ? 
CompletableFuture.completedFuture(entries.get(0))
-                                : CompletableFuture.completedFuture(null)));
+        return compactedTopicContext.thenCompose(context -> {
+            if (context.ledger.getLastAddConfirmed() == -1) {
+                return CompletableFuture.completedFuture(null);
+            }
+            return readEntries(
+                    context.ledger, context.ledger.getLastAddConfirmed(), 
context.ledger.getLastAddConfirmed())
+                    .thenCompose(entries -> entries.size() > 0
+                            ? CompletableFuture.completedFuture(entries.get(0))
+                            : CompletableFuture.completedFuture(null));
+        });
     }
 
     private static int comparePositionAndMessageId(PositionImpl p, 
MessageIdData m) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 69d66d9..d44868a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -650,4 +650,42 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
         reader.close();
         producer.close();
     }
+
+    @Test(timeOut = 1000 * 30)
+    public void testReader() throws Exception {
+        final String ns = "my-property/use/my-ns";
+        String topic = "persistent://" + ns + "/t1";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+
+        producer.newMessage().key("k").value(("value").getBytes()).send();
+        producer.newMessage().key("k").value(null).send();
+        pulsar.getCompactor().compact(topic).get();
+
+        Awaitility.await()
+                .pollInterval(3, TimeUnit.SECONDS)
+                .atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+            admin.topics().unload(topic);
+            Thread.sleep(100);
+            
Assert.assertTrue(admin.topics().getInternalStats(topic).lastConfirmedEntry.endsWith("-1"));
+        });
+        // Make sure the last confirm entry is -1, then get last message id 
from compact ledger
+        PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(topic);
+        Assert.assertTrue(internalStats.lastConfirmedEntry.endsWith("-1"));
+        // Because the latest value of the key `k` is null, so there is no 
data in compact ledger.
+        Assert.assertEquals(internalStats.compactedLedger.size, 0);
+
+        @Cleanup
+        Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(topic)
+                .startMessageIdInclusive()
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .create();
+        Assert.assertFalse(reader.hasMessageAvailable());
+    }
+
 }

Reply via email to