This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2e7088d0d503520ca910bd7f80d4e8f9c2d58712
Author: lipenghui <[email protected]>
AuthorDate: Wed Oct 6 09:10:03 2021 +0800

    Fix incorrect returned last message ID while the `lastConfirmedEntry` with 
negative entry ID. (#12277)
    
    Which recover a ManagedLedger, if the ManagedLedgers does not contain any 
ledgers,
    the ManagedLedger will use the current Ledger ID and -1 to generate the 
`lastConfirmedEntry`.
    For more details to see: 
https://github.com/apache/pulsar/blob/4bc3c405a565b1c756b9b70ff02a63ea06c32c0d/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L477
    
    But for compacted topic, all the data might be compacted and move to the 
compacted Ledger,
    In this case, the broker will return X:-1 as the last message ID of the 
topic to the consumer,
    the consumer will treat the negative entry ID as no data in this topic,
    so hasMoreMessages will return false, but there is compacted data in the 
topic.
    
    The fix is as #12161 does to return the last message ID from the compacted 
Ledger if `lastConfirmedEntry` of the ManagedLedger with negative entry ID.
    
    Improved the `testLastMessageIdForCompactedLedger` test to cover the new 
changes.
    
    (cherry picked from commit 65fb6c6cce3ffac221a559279eb17ce1ef9b885b)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 66 ++++++++++++----------
 .../pulsar/compaction/CompactedTopicTest.java      | 17 ++++++
 2 files changed, 52 insertions(+), 31 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 390d493..7787382 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1688,10 +1688,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
         // If it's not pointing to a valid entry, respond messageId of the 
current position.
         if (lastPosition.getEntryId() == -1) {
-            ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
-                    lastPosition.getLedgerId(), lastPosition.getEntryId(), 
partitionIndex, -1,
-                    markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
-                    markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
+            handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, 
partitionIndex,
+                    markDeletePosition);
             return;
         }
 
@@ -1719,33 +1717,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         batchSizeFuture.whenComplete((batchSize, e) -> {
             if (e != null) {
                 if (e.getCause() instanceof 
ManagedLedgerException.NonRecoverableLedgerException) {
-                    
persistentTopic.getCompactedTopic().readLastEntryOfCompactedLedger().thenAccept(entry
 -> {
-                        if (entry != null) {
-                            // in this case, all the data has been compacted, 
so return the last position
-                            // in the compacted ledger to the client
-                            MessageMetadata metadata = 
Commands.parseMessageMetadata(entry.getDataBuffer());
-                            int bs = metadata.getNumMessagesInBatch();
-                            int largestBatchIndex = bs > 0 ? bs - 1 : -1;
-                            
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
-                                    entry.getLedgerId(), entry.getEntryId(), 
partitionIndex, largestBatchIndex,
-                                    markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
-                                    markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
-                            entry.release();
-                        } else {
-                            // in this case, the ledgers been removed except 
the current ledger
-                            // and current ledger without any data
-                            
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
-                                    -1, -1, partitionIndex, -1,
-                                    markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
-                                    markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
-                        }
-                    }).exceptionally(ex -> {
-                        ctx.writeAndFlush(Commands.newError(
-                                requestId, ServerError.MetadataError,
-                                "Failed to read last entry of the compacted 
Ledger "
-                                        + ex.getCause().getMessage()));
-                        return null;
-                    });
+                    handleLastMessageIdFromCompactedLedger(persistentTopic, 
requestId, partitionIndex,
+                            markDeletePosition);
                 } else {
                     ctx.writeAndFlush(Commands.newError(
                             requestId, ServerError.MetadataError,
@@ -1767,6 +1740,37 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         });
     }
 
+    private void handleLastMessageIdFromCompactedLedger(PersistentTopic 
persistentTopic, long requestId,
+            int partitionIndex, PositionImpl markDeletePosition) {
+        
persistentTopic.getCompactedTopic().readLastEntryOfCompactedLedger().thenAccept(entry
 -> {
+            if (entry != null) {
+                // in this case, all the data has been compacted, so return 
the last position
+                // in the compacted ledger to the client
+                MessageMetadata metadata = 
Commands.parseMessageMetadata(entry.getDataBuffer());
+                int bs = metadata.getNumMessagesInBatch();
+                int largestBatchIndex = bs > 0 ? bs - 1 : -1;
+                
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
+                        entry.getLedgerId(), entry.getEntryId(), 
partitionIndex, largestBatchIndex,
+                        markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
+                        markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
+                entry.release();
+            } else {
+                // in this case, the ledgers been removed except the current 
ledger
+                // and current ledger without any data
+                
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
+                        -1, -1, partitionIndex, -1,
+                        markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
+                        markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
+            }
+        }).exceptionally(ex -> {
+            ctx.writeAndFlush(Commands.newError(
+                    requestId, ServerError.MetadataError,
+                    "Failed to read last entry of the compacted Ledger "
+                            + ex.getCause().getMessage()));
+            return null;
+        });
+    }
+
     private CompletableFuture<Boolean> 
isNamespaceOperationAllowed(NamespaceName namespaceName,
                                                                    
NamespaceOperation operation) {
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 4dc7bf3..3d41088 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -419,5 +419,22 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
         MessageId messageId = ((ReaderImpl<String>) 
reader).getConsumer().getLastMessageId();
         Assert.assertEquals(messageId, received.getMessageId());
         Assert.assertFalse(reader.hasMessageAvailable());
+        reader.close();
+
+        // Unload the topic again to simulate entry ID with -1 after all data 
has been compacted.
+        admin.topics().unload(topic);
+        PersistentTopicInternalStats stats2 = 
admin.topics().getInternalStats(topic);
+        Assert.assertTrue(stats2.lastConfirmedEntry.endsWith(":-1"));
+        Assert.assertTrue(stats2.compactedLedger.ledgerId > 0);
+
+        reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test")
+                .readCompacted(true)
+                .startMessageId(MessageId.earliest)
+                .create();
+        Assert.assertTrue(reader.hasMessageAvailable());
+        reader.readNext();
+        Assert.assertFalse(reader.hasMessageAvailable());
     }
 }

Reply via email to