This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 29533f1c45015c177ec8d441189bf42bdeddd8e0 Author: ran <[email protected]> AuthorDate: Mon Dec 27 09:45:38 2021 +0800 Fix getting last message id from empty compact ledger (#13476) ### Motivation Currently, if the last confirmed entry is an empty ledger, getting the last message-id operation will get data from the compact ledger, if the compact ledger is also an empty ledger, it will encounter `IncorrectParameterException`. **Broker error message** ``` [pulsar-io-29-9] ERROR org.apache.bookkeeper.client.LedgerHandle - IncorrectParameterException on ledgerId:617 firstEntry:-1 lastEntry:-1 ``` **Client error log** ``` Exception in thread "main" org.apache.pulsar.client.api.PulsarClientException$BrokerMetadataException: The subscription reader-bf9246cfcb of the topic persistent://public/ns-test/t1 gets the last message id was failed {"errorMsg":"Failed to read last entry of the compacted Ledger Incorrect parameter input","reqId":79405902881798690, "remote":"localhost/127.0.0.1:6650", "local":"/127.0.0.1:55207"} at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1034) at org.apache.pulsar.client.impl.ConsumerImpl.hasMessageAvailable(ConsumerImpl.java:2001) at org.apache.pulsar.client.impl.ReaderImpl.hasMessageAvailable(ReaderImpl.java:181) at org.apache.pulsar.compaction.CompactedTopicTest.main(CompactedTopicTest.java:730) ``` ### Modifications Check the compact ledger entry id before reading an entry from the compact ledger, if there is no entry, return a null value. (cherry picked from commit 8136762e743fe1bc4be2adeb08631a8c44719d37) (cherry picked from commit 472f02d1799eb18ee5a32308f22c570f70ec1073) --- .../pulsar/compaction/CompactedTopicImpl.java | 15 ++++++--- .../pulsar/compaction/CompactedTopicTest.java | 38 ++++++++++++++++++++++ 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index 4577540..a6d6fc9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -293,11 +293,16 @@ public class CompactedTopicImpl implements CompactedTopic { if (compactionHorizon == null) { return CompletableFuture.completedFuture(null); } - return compactedTopicContext.thenCompose(context -> - readEntries(context.ledger, context.ledger.getLastAddConfirmed(), context.ledger.getLastAddConfirmed()) - .thenCompose(entries -> entries.size() > 0 - ? CompletableFuture.completedFuture(entries.get(0)) - : CompletableFuture.completedFuture(null))); + return compactedTopicContext.thenCompose(context -> { + if (context.ledger.getLastAddConfirmed() == -1) { + return CompletableFuture.completedFuture(null); + } + return readEntries( + context.ledger, context.ledger.getLastAddConfirmed(), context.ledger.getLastAddConfirmed()) + .thenCompose(entries -> entries.size() > 0 + ? CompletableFuture.completedFuture(entries.get(0)) + : CompletableFuture.completedFuture(null)); + }); } private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java index 69d66d9..d44868a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java @@ -650,4 +650,42 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest { reader.close(); producer.close(); } + + @Test(timeOut = 1000 * 30) + public void testReader() throws Exception { + final String ns = "my-property/use/my-ns"; + String topic = "persistent://" + ns + "/t1"; + + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .create(); + + producer.newMessage().key("k").value(("value").getBytes()).send(); + producer.newMessage().key("k").value(null).send(); + pulsar.getCompactor().compact(topic).get(); + + Awaitility.await() + .pollInterval(3, TimeUnit.SECONDS) + .atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + admin.topics().unload(topic); + Thread.sleep(100); + Assert.assertTrue(admin.topics().getInternalStats(topic).lastConfirmedEntry.endsWith("-1")); + }); + // Make sure the last confirm entry is -1, then get last message id from compact ledger + PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic); + Assert.assertTrue(internalStats.lastConfirmedEntry.endsWith("-1")); + // Because the latest value of the key `k` is null, so there is no data in compact ledger. + Assert.assertEquals(internalStats.compactedLedger.size, 0); + + @Cleanup + Reader<byte[]> reader = pulsarClient.newReader() + .topic(topic) + .startMessageIdInclusive() + .startMessageId(MessageId.earliest) + .readCompacted(true) + .create(); + Assert.assertFalse(reader.hasMessageAvailable()); + } + }
