This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2e7088d0d503520ca910bd7f80d4e8f9c2d58712 Author: lipenghui <[email protected]> AuthorDate: Wed Oct 6 09:10:03 2021 +0800 Fix incorrect returned last message ID while the `lastConfirmedEntry` with negative entry ID. (#12277) Which recover a ManagedLedger, if the ManagedLedgers does not contain any ledgers, the ManagedLedger will use the current Ledger ID and -1 to generate the `lastConfirmedEntry`. For more details to see: https://github.com/apache/pulsar/blob/4bc3c405a565b1c756b9b70ff02a63ea06c32c0d/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L477 But for compacted topic, all the data might be compacted and move to the compacted Ledger, In this case, the broker will return X:-1 as the last message ID of the topic to the consumer, the consumer will treat the negative entry ID as no data in this topic, so hasMoreMessages will return false, but there is compacted data in the topic. The fix is as #12161 does to return the last message ID from the compacted Ledger if `lastConfirmedEntry` of the ManagedLedger with negative entry ID. Improved the `testLastMessageIdForCompactedLedger` test to cover the new changes. (cherry picked from commit 65fb6c6cce3ffac221a559279eb17ce1ef9b885b) --- .../apache/pulsar/broker/service/ServerCnx.java | 66 ++++++++++++---------- .../pulsar/compaction/CompactedTopicTest.java | 17 ++++++ 2 files changed, 52 insertions(+), 31 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 390d493..7787382 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1688,10 +1688,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { // If it's not pointing to a valid entry, respond messageId of the current position. if (lastPosition.getEntryId() == -1) { - ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, - lastPosition.getLedgerId(), lastPosition.getEntryId(), partitionIndex, -1, - markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, - markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); + handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex, + markDeletePosition); return; } @@ -1719,33 +1717,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { batchSizeFuture.whenComplete((batchSize, e) -> { if (e != null) { if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) { - persistentTopic.getCompactedTopic().readLastEntryOfCompactedLedger().thenAccept(entry -> { - if (entry != null) { - // in this case, all the data has been compacted, so return the last position - // in the compacted ledger to the client - MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); - int bs = metadata.getNumMessagesInBatch(); - int largestBatchIndex = bs > 0 ? bs - 1 : -1; - ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, - entry.getLedgerId(), entry.getEntryId(), partitionIndex, largestBatchIndex, - markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, - markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); - entry.release(); - } else { - // in this case, the ledgers been removed except the current ledger - // and current ledger without any data - ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, - -1, -1, partitionIndex, -1, - markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, - markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); - } - }).exceptionally(ex -> { - ctx.writeAndFlush(Commands.newError( - requestId, ServerError.MetadataError, - "Failed to read last entry of the compacted Ledger " - + ex.getCause().getMessage())); - return null; - }); + handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex, + markDeletePosition); } else { ctx.writeAndFlush(Commands.newError( requestId, ServerError.MetadataError, @@ -1767,6 +1740,37 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { }); } + private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTopic, long requestId, + int partitionIndex, PositionImpl markDeletePosition) { + persistentTopic.getCompactedTopic().readLastEntryOfCompactedLedger().thenAccept(entry -> { + if (entry != null) { + // in this case, all the data has been compacted, so return the last position + // in the compacted ledger to the client + MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + int bs = metadata.getNumMessagesInBatch(); + int largestBatchIndex = bs > 0 ? bs - 1 : -1; + ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, + entry.getLedgerId(), entry.getEntryId(), partitionIndex, largestBatchIndex, + markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, + markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); + entry.release(); + } else { + // in this case, the ledgers been removed except the current ledger + // and current ledger without any data + ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, + -1, -1, partitionIndex, -1, + markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, + markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); + } + }).exceptionally(ex -> { + ctx.writeAndFlush(Commands.newError( + requestId, ServerError.MetadataError, + "Failed to read last entry of the compacted Ledger " + + ex.getCause().getMessage())); + return null; + }); + } + private CompletableFuture<Boolean> isNamespaceOperationAllowed(NamespaceName namespaceName, NamespaceOperation operation) { CompletableFuture<Boolean> isProxyAuthorizedFuture; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java index 4dc7bf3..3d41088 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java @@ -419,5 +419,22 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest { MessageId messageId = ((ReaderImpl<String>) reader).getConsumer().getLastMessageId(); Assert.assertEquals(messageId, received.getMessageId()); Assert.assertFalse(reader.hasMessageAvailable()); + reader.close(); + + // Unload the topic again to simulate entry ID with -1 after all data has been compacted. + admin.topics().unload(topic); + PersistentTopicInternalStats stats2 = admin.topics().getInternalStats(topic); + Assert.assertTrue(stats2.lastConfirmedEntry.endsWith(":-1")); + Assert.assertTrue(stats2.compactedLedger.ledgerId > 0); + + reader = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .subscriptionName("test") + .readCompacted(true) + .startMessageId(MessageId.earliest) + .create(); + Assert.assertTrue(reader.hasMessageAvailable()); + reader.readNext(); + Assert.assertFalse(reader.hasMessageAvailable()); } }
