This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2bc8f487a71ea812b06d119f24fd7051c82eb6e8
Author: lipenghui <[email protected]>
AuthorDate: Fri Feb 25 21:38:47 2022 +0800

    Fix can't read the latest message of the compacted topic (#14449)
    
    If the reader enabled read compacted and all the data of topic has been 
compacted
    to the compacted ledger, the original topic does not have any data. In this 
case,
    the reader is not able to read the latest message of the compacted topic.
    
    ```java
    Reader<byte[]> reader = pulsarClient.newReader()
            .topic(topic)
            .startMessageId(MessageId.latest)
            .startMessageIdInclusive()
            .readCompacted(true)
            .create();
    ```
    
    The root cause is if the `startMessageIdInclusive` is true
    and the `startMessageId` is `latest`, the reader will get the
    last message ID from the broker and then seek to the last message.
    But, the seek method did not consider if there are messages in the
    compacted ledger, so not able to seek to last message of the compacted
    ledger.
    
    Add force reset option for the managed cursor, if the seek
    position < compaction horizon, we should force reset the cursor
    to the given position, so that the reader able to start read from
    the compacted ledger.
    
    (cherry picked from commit ddc51924e90550ef50fd3cdd099b6aec56aec260)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  6 ++-
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 11 ++---
 .../mledger/impl/ManagedCursorContainerTest.java   |  3 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  6 +--
 .../mledger/impl/NonDurableCursorTest.java         |  2 +-
 .../service/persistent/PersistentSubscription.java | 10 ++++-
 .../pulsar/broker/service/PersistentTopicTest.java |  3 +-
 .../pulsar/compaction/CompactedTopicTest.java      | 50 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  6 ++-
 9 files changed, 82 insertions(+), 15 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index d1fb90a..f67cd96 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -533,10 +533,14 @@ public interface ManagedCursor {
      *
      * @param position
      *            position to move the cursor to
+     * @param forceReset
+     *            whether to force reset the position even if the position is 
no longer in the managed ledger,
+     *            this is used by compacted topic which has data in the 
compacted ledger, to ensure the cursor can
+     *            read data from the compacted ledger.
      * @param callback
      *            callback object
      */
-    void asyncResetCursor(final Position position, 
AsyncCallbacks.ResetCursorCallback callback);
+    void asyncResetCursor(Position position, boolean forceReset, 
AsyncCallbacks.ResetCursorCallback callback);
 
     /**
      * Read the specified set of positions from ManagedLedger.
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8c345026..8c96f0e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1135,7 +1135,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     }
 
     @Override
-    public void asyncResetCursor(Position newPos, 
AsyncCallbacks.ResetCursorCallback callback) {
+    public void asyncResetCursor(Position newPos, boolean forceReset, 
AsyncCallbacks.ResetCursorCallback callback) {
         checkArgument(newPos instanceof PositionImpl);
         final PositionImpl newPosition = (PositionImpl) newPos;
 
@@ -1143,9 +1143,10 @@ public class ManagedCursorImpl implements ManagedCursor {
         ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> {
             PositionImpl actualPosition = newPosition;
 
-            if (!ledger.isValidPosition(actualPosition) &&
-                !actualPosition.equals(PositionImpl.earliest) &&
-                !actualPosition.equals(PositionImpl.latest)) {
+            if (!ledger.isValidPosition(actualPosition)
+                && !actualPosition.equals(PositionImpl.earliest)
+                && !actualPosition.equals(PositionImpl.latest)
+                && !forceReset) {
                 actualPosition = ledger.getNextValidPosition(actualPosition);
 
                 if (actualPosition == null) {
@@ -1168,7 +1169,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         final Result result = new Result();
         final CountDownLatch counter = new CountDownLatch(1);
 
-        asyncResetCursor(newPos, new AsyncCallbacks.ResetCursorCallback() {
+        asyncResetCursor(newPos, false, new 
AsyncCallbacks.ResetCursorCallback() {
             @Override
             public void resetComplete(Object ctx) {
                 counter.countDown();
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index af30c9c..56de803 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -237,7 +237,8 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
-        public void asyncResetCursor(final Position position, 
AsyncCallbacks.ResetCursorCallback callback) {
+        public void asyncResetCursor(final Position position, boolean 
forceReset,
+                AsyncCallbacks.ResetCursorCallback callback) {
 
         }
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 676e92f..892d6b3 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -687,7 +687,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         CountDownLatch countDownLatch = new CountDownLatch(1);
         PositionImpl resetPosition = new 
PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() - 2);
 
-        cursor.asyncResetCursor(resetPosition, new 
AsyncCallbacks.ResetCursorCallback() {
+        cursor.asyncResetCursor(resetPosition, false, new 
AsyncCallbacks.ResetCursorCallback() {
             @Override
             public void resetComplete(Object ctx) {
                 moveStatus.set(true);
@@ -738,7 +738,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
                     final PositionImpl resetPosition = new 
PositionImpl(lastPosition.getLedgerId(),
                             lastPosition.getEntryId() - (5 * idx));
 
-                    cursor.asyncResetCursor(resetPosition, new 
AsyncCallbacks.ResetCursorCallback() {
+                    cursor.asyncResetCursor(resetPosition, false, new 
AsyncCallbacks.ResetCursorCallback() {
                         @Override
                         public void resetComplete(Object ctx) {
                             moveStatus.set(true);
@@ -787,7 +787,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
 
         long lastActive = cursor.getLastActive();
 
-        cursor.asyncResetCursor(lastPosition, new 
AsyncCallbacks.ResetCursorCallback() {
+        cursor.asyncResetCursor(lastPosition, false, new 
AsyncCallbacks.ResetCursorCallback() {
             @Override
             public void resetComplete(Object ctx) {
                 moveStatus.set(true);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 4c2944f..5d0271d 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -371,7 +371,7 @@ public class NonDurableCursorTest extends 
MockedBookKeeperTestCase {
         CountDownLatch countDownLatch = new CountDownLatch(1);
         PositionImpl resetPosition = new 
PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() - 2);
 
-        cursor.asyncResetCursor(resetPosition, new 
AsyncCallbacks.ResetCursorCallback() {
+        cursor.asyncResetCursor(resetPosition, false, new 
AsyncCallbacks.ResetCursorCallback() {
             @Override
             public void resetComplete(Object ctx) {
                 moveStatus.set(true);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 8637ecd..9e57580 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -659,7 +659,15 @@ public class PersistentSubscription implements 
Subscription {
                     topicName, subName);
 
             try {
-                cursor.asyncResetCursor(finalPosition, new 
AsyncCallbacks.ResetCursorCallback() {
+                boolean forceReset = false;
+                if (topic.getCompactedTopic() != null && 
topic.getCompactedTopic().getCompactionHorizon().isPresent()) {
+                    PositionImpl horizon = (PositionImpl) 
topic.getCompactedTopic().getCompactionHorizon().get();
+                    PositionImpl resetTo = (PositionImpl) finalPosition;
+                    if (horizon.compareTo(resetTo) >= 0) {
+                        forceReset = true;
+                    }
+                }
+                cursor.asyncResetCursor(finalPosition, forceReset, new 
AsyncCallbacks.ResetCursorCallback() {
                     @Override
                     public void resetComplete(Object ctx) {
                         if (log.isDebugEnabled()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index d94320f..2314acd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.CALLS_REAL_METHODS;
 import static org.mockito.Mockito.anyString;
@@ -2177,7 +2178,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         doAnswer((Answer<Object>) invocationOnMock -> {
             ((AsyncCallbacks.ResetCursorCallback) 
invocationOnMock.getArguments()[1]).resetComplete(null);
             return null;
-        }).when(mockCursor).asyncResetCursor(any(), any());
+        }).when(mockCursor).asyncResetCursor(any(), anyBoolean(), any());
         doAnswer((Answer<Object>) invocationOnMock -> {
             ((DeleteCursorCallback) 
invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
             return null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index accce4ef..4ae699b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -734,6 +734,7 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
         Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS));
     }
 
+    @Test
     public void testReadCompleteMessagesDuringTopicUnloading() throws 
Exception {
         String topic = 
"persistent://my-property/use/my-ns/testReadCompleteMessagesDuringTopicUnloading-"
 +
                 UUID.randomUUID();
@@ -789,4 +790,53 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
             Assert.assertEquals(reader.readNext().getValue(), 
String.format("msg [%d]", i + numMessages));
         }
     }
+
+    @Test
+    public void testReadCompactedLatestMessageWithInclusive() throws Exception 
{
+        String topic = 
"persistent://my-property/use/my-ns/testLedgerRollover-" +
+                UUID.randomUUID();
+        final int numMessages = 1;
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .blockIfQueueFull(true)
+                .enableBatching(false)
+                .create();
+
+        CompletableFuture<MessageId> lastMessage = null;
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key(i + 
"").value(String.format("msg [%d]", i)).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+        admin.topics().unload(topic);
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, numMessages);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    
.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+            Assert.assertEquals(stats.lastConfirmedEntry, 
stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition);
+        });
+
+        Awaitility.await()
+                .pollInterval(3, TimeUnit.SECONDS)
+                .atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+                    admin.topics().unload(topic);
+                    
Assert.assertTrue(admin.topics().getInternalStats(topic).lastConfirmedEntry.endsWith("-1"));
+                });
+
+        @Cleanup
+        Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(topic)
+                .startMessageIdInclusive()
+                .startMessageId(MessageId.latest)
+                .readCompacted(true)
+                .create();
+
+        Assert.assertTrue(reader.hasMessageAvailable());
+        Assert.assertEquals(reader.readNext().getMessageId(), 
lastMessage.get());
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e5a5745..856a775 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2012,8 +2012,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     MessageIdImpl markDeletePosition = MessageIdImpl
                             
.convertToMessageIdImpl(response.markDeletePosition);
 
-                    if (markDeletePosition != null) {
-                        // we only care about comparing ledger ids and entry 
ids as mark delete position doesn't have other ids such as batch index
+                    if (markDeletePosition != null && 
!(markDeletePosition.getEntryId() < 0
+                            && markDeletePosition.getLedgerId() > 
lastMessageId.getLedgerId())) {
+                        // we only care about comparing ledger ids and entry 
ids as mark delete position doesn't have
+                        // other ids such as batch index
                         int result = ComparisonChain.start()
                                 .compare(markDeletePosition.getLedgerId(), 
lastMessageId.getLedgerId())
                                 .compare(markDeletePosition.getEntryId(), 
lastMessageId.getEntryId())

Reply via email to