This is an automated email from the ASF dual-hosted git repository. rgao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2bc8f487a71ea812b06d119f24fd7051c82eb6e8 Author: lipenghui <[email protected]> AuthorDate: Fri Feb 25 21:38:47 2022 +0800 Fix can't read the latest message of the compacted topic (#14449) If the reader enabled read compacted and all the data of topic has been compacted to the compacted ledger, the original topic does not have any data. In this case, the reader is not able to read the latest message of the compacted topic. ```java Reader<byte[]> reader = pulsarClient.newReader() .topic(topic) .startMessageId(MessageId.latest) .startMessageIdInclusive() .readCompacted(true) .create(); ``` The root cause is if the `startMessageIdInclusive` is true and the `startMessageId` is `latest`, the reader will get the last message ID from the broker and then seek to the last message. But, the seek method did not consider if there are messages in the compacted ledger, so not able to seek to last message of the compacted ledger. Add force reset option for the managed cursor, if the seek position < compaction horizon, we should force reset the cursor to the given position, so that the reader able to start read from the compacted ledger. (cherry picked from commit ddc51924e90550ef50fd3cdd099b6aec56aec260) --- .../apache/bookkeeper/mledger/ManagedCursor.java | 6 ++- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 11 ++--- .../mledger/impl/ManagedCursorContainerTest.java | 3 +- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 6 +-- .../mledger/impl/NonDurableCursorTest.java | 2 +- .../service/persistent/PersistentSubscription.java | 10 ++++- .../pulsar/broker/service/PersistentTopicTest.java | 3 +- .../pulsar/compaction/CompactedTopicTest.java | 50 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 6 ++- 9 files changed, 82 insertions(+), 15 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index d1fb90a..f67cd96 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -533,10 +533,14 @@ public interface ManagedCursor { * * @param position * position to move the cursor to + * @param forceReset + * whether to force reset the position even if the position is no longer in the managed ledger, + * this is used by compacted topic which has data in the compacted ledger, to ensure the cursor can + * read data from the compacted ledger. * @param callback * callback object */ - void asyncResetCursor(final Position position, AsyncCallbacks.ResetCursorCallback callback); + void asyncResetCursor(Position position, boolean forceReset, AsyncCallbacks.ResetCursorCallback callback); /** * Read the specified set of positions from ManagedLedger. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 8c345026..8c96f0e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1135,7 +1135,7 @@ public class ManagedCursorImpl implements ManagedCursor { } @Override - public void asyncResetCursor(Position newPos, AsyncCallbacks.ResetCursorCallback callback) { + public void asyncResetCursor(Position newPos, boolean forceReset, AsyncCallbacks.ResetCursorCallback callback) { checkArgument(newPos instanceof PositionImpl); final PositionImpl newPosition = (PositionImpl) newPos; @@ -1143,9 +1143,10 @@ public class ManagedCursorImpl implements ManagedCursor { ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> { PositionImpl actualPosition = newPosition; - if (!ledger.isValidPosition(actualPosition) && - !actualPosition.equals(PositionImpl.earliest) && - !actualPosition.equals(PositionImpl.latest)) { + if (!ledger.isValidPosition(actualPosition) + && !actualPosition.equals(PositionImpl.earliest) + && !actualPosition.equals(PositionImpl.latest) + && !forceReset) { actualPosition = ledger.getNextValidPosition(actualPosition); if (actualPosition == null) { @@ -1168,7 +1169,7 @@ public class ManagedCursorImpl implements ManagedCursor { final Result result = new Result(); final CountDownLatch counter = new CountDownLatch(1); - asyncResetCursor(newPos, new AsyncCallbacks.ResetCursorCallback() { + asyncResetCursor(newPos, false, new AsyncCallbacks.ResetCursorCallback() { @Override public void resetComplete(Object ctx) { counter.countDown(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index af30c9c..56de803 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -237,7 +237,8 @@ public class ManagedCursorContainerTest { } @Override - public void asyncResetCursor(final Position position, AsyncCallbacks.ResetCursorCallback callback) { + public void asyncResetCursor(final Position position, boolean forceReset, + AsyncCallbacks.ResetCursorCallback callback) { } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 676e92f..892d6b3 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -687,7 +687,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { CountDownLatch countDownLatch = new CountDownLatch(1); PositionImpl resetPosition = new PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() - 2); - cursor.asyncResetCursor(resetPosition, new AsyncCallbacks.ResetCursorCallback() { + cursor.asyncResetCursor(resetPosition, false, new AsyncCallbacks.ResetCursorCallback() { @Override public void resetComplete(Object ctx) { moveStatus.set(true); @@ -738,7 +738,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { final PositionImpl resetPosition = new PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() - (5 * idx)); - cursor.asyncResetCursor(resetPosition, new AsyncCallbacks.ResetCursorCallback() { + cursor.asyncResetCursor(resetPosition, false, new AsyncCallbacks.ResetCursorCallback() { @Override public void resetComplete(Object ctx) { moveStatus.set(true); @@ -787,7 +787,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { long lastActive = cursor.getLastActive(); - cursor.asyncResetCursor(lastPosition, new AsyncCallbacks.ResetCursorCallback() { + cursor.asyncResetCursor(lastPosition, false, new AsyncCallbacks.ResetCursorCallback() { @Override public void resetComplete(Object ctx) { moveStatus.set(true); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java index 4c2944f..5d0271d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java @@ -371,7 +371,7 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase { CountDownLatch countDownLatch = new CountDownLatch(1); PositionImpl resetPosition = new PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() - 2); - cursor.asyncResetCursor(resetPosition, new AsyncCallbacks.ResetCursorCallback() { + cursor.asyncResetCursor(resetPosition, false, new AsyncCallbacks.ResetCursorCallback() { @Override public void resetComplete(Object ctx) { moveStatus.set(true); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 8637ecd..9e57580 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -659,7 +659,15 @@ public class PersistentSubscription implements Subscription { topicName, subName); try { - cursor.asyncResetCursor(finalPosition, new AsyncCallbacks.ResetCursorCallback() { + boolean forceReset = false; + if (topic.getCompactedTopic() != null && topic.getCompactedTopic().getCompactionHorizon().isPresent()) { + PositionImpl horizon = (PositionImpl) topic.getCompactedTopic().getCompactionHorizon().get(); + PositionImpl resetTo = (PositionImpl) finalPosition; + if (horizon.compareTo(resetTo) >= 0) { + forceReset = true; + } + } + cursor.asyncResetCursor(finalPosition, forceReset, new AsyncCallbacks.ResetCursorCallback() { @Override public void resetComplete(Object ctx) { if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index d94320f..2314acd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.anyString; @@ -2177,7 +2178,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { doAnswer((Answer<Object>) invocationOnMock -> { ((AsyncCallbacks.ResetCursorCallback) invocationOnMock.getArguments()[1]).resetComplete(null); return null; - }).when(mockCursor).asyncResetCursor(any(), any()); + }).when(mockCursor).asyncResetCursor(any(), anyBoolean(), any()); doAnswer((Answer<Object>) invocationOnMock -> { ((DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete(null); return null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java index accce4ef..4ae699b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java @@ -734,6 +734,7 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest { Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS)); } + @Test public void testReadCompleteMessagesDuringTopicUnloading() throws Exception { String topic = "persistent://my-property/use/my-ns/testReadCompleteMessagesDuringTopicUnloading-" + UUID.randomUUID(); @@ -789,4 +790,53 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(reader.readNext().getValue(), String.format("msg [%d]", i + numMessages)); } } + + @Test + public void testReadCompactedLatestMessageWithInclusive() throws Exception { + String topic = "persistent://my-property/use/my-ns/testLedgerRollover-" + + UUID.randomUUID(); + final int numMessages = 1; + + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .blockIfQueueFull(true) + .enableBatching(false) + .create(); + + CompletableFuture<MessageId> lastMessage = null; + for (int i = 0; i < numMessages; ++i) { + lastMessage = producer.newMessage().key(i + "").value(String.format("msg [%d]", i)).sendAsync(); + } + producer.flush(); + lastMessage.join(); + admin.topics().unload(topic); + admin.topics().triggerCompaction(topic); + Awaitility.await().untilAsserted(() -> { + PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); + Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1); + Assert.assertEquals(stats.compactedLedger.entries, numMessages); + Assert.assertEquals(admin.topics().getStats(topic) + .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0); + Assert.assertEquals(stats.lastConfirmedEntry, stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition); + }); + + Awaitility.await() + .pollInterval(3, TimeUnit.SECONDS) + .atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + admin.topics().unload(topic); + Assert.assertTrue(admin.topics().getInternalStats(topic).lastConfirmedEntry.endsWith("-1")); + }); + + @Cleanup + Reader<byte[]> reader = pulsarClient.newReader() + .topic(topic) + .startMessageIdInclusive() + .startMessageId(MessageId.latest) + .readCompacted(true) + .create(); + + Assert.assertTrue(reader.hasMessageAvailable()); + Assert.assertEquals(reader.readNext().getMessageId(), lastMessage.get()); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index e5a5745..856a775 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2012,8 +2012,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle MessageIdImpl markDeletePosition = MessageIdImpl .convertToMessageIdImpl(response.markDeletePosition); - if (markDeletePosition != null) { - // we only care about comparing ledger ids and entry ids as mark delete position doesn't have other ids such as batch index + if (markDeletePosition != null && !(markDeletePosition.getEntryId() < 0 + && markDeletePosition.getLedgerId() > lastMessageId.getLedgerId())) { + // we only care about comparing ledger ids and entry ids as mark delete position doesn't have + // other ids such as batch index int result = ComparisonChain.start() .compare(markDeletePosition.getLedgerId(), lastMessageId.getLedgerId()) .compare(markDeletePosition.getEntryId(), lastMessageId.getEntryId())
