This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1fbc7ed9ab0bbe1c75d6576bc37933afd4bbc42f
Author: lipenghui <[email protected]>
AuthorDate: Thu Oct 21 09:52:47 2021 +0800

    Fix compactor skips data from last compacted Ledger (#12429)
    
    ## Motivation
    
    The PR is fixing the compacted data lost during the data compaction.
    We see a few events deletion but the compacted events obviously dropped a 
lot.
    
    
![image](https://user-images.githubusercontent.com/12592133/138008777-00eb7c0b-358e-4291-bfd4-f4b27cbedbf4.png)
    
    After investigating more details about the issue, only the first read 
operation reads the data from
    the compacted ledger, since the second read operation, the broker start 
read data from the original
    topic.
    
    ```
    2021-10-19T23:09:30,021+0800 [broker-topic-workers-OrderedScheduler-7-0] 
INFO  org.apache.pulsar.compaction.CompactedTopicImpl - 
=====[public/default/persistent/c499d42c-75d7-48d1-9225-2e724c0e1d83] Read from 
compacted Ledger = cursor position: -1:-1, Horizon: 16:-1, isFirstRead: true
    2021-10-19T23:09:30,049+0800 [broker-topic-workers-OrderedScheduler-7-0] 
INFO  org.apache.pulsar.compaction.CompactedTopicImpl - 
=====[public/default/persistent/c499d42c-75d7-48d1-9225-2e724c0e1d83] Read from 
original Ledger = cursor position: 16:0, Horizon: 16:-1, isFirstRead: false
    ```
    
    ## Modifications
    
    The compaction task depends on the last snapshot and the incremental
    entries to build the new snapshot. So for the compaction cursor, we
    need to force seek the read position to ensure the compactor can read
    the complete last snapshot because the compactor will read the data
    before the compaction cursor mark delete position.
    
    ## Verifying this change
    
    New test added for checking the compacted data will not lost.
    
    (cherry picked from commit 1830f90d08acc079c6ee5a5ec05751ab4cbee490)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  6 ++-
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  6 +--
 .../mledger/impl/ManagedCursorContainerTest.java   |  2 +-
 .../pulsar/compaction/CompactedTopicImpl.java      |  9 ++++-
 .../pulsar/compaction/CompactedTopicTest.java      | 45 ++++++++++++++++++++++
 5 files changed, 61 insertions(+), 7 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 4af6455..72ee1a1 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -425,7 +425,11 @@ public interface ManagedCursor {
      * @param newReadPosition
      *            the position where to move the cursor
      */
-    void seek(Position newReadPosition);
+    default void seek(Position newReadPosition) {
+        seek(newReadPosition, false);
+    }
+
+    void seek(Position newReadPosition, boolean force);
 
     /**
      * Clear the cursor backlog.
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 82b4cfc..8e794df 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2163,18 +2163,16 @@ public class ManagedCursorImpl implements ManagedCursor 
{
     }
 
     @Override
-    public void seek(Position newReadPositionInt) {
+    public void seek(Position newReadPositionInt, boolean force) {
         checkArgument(newReadPositionInt instanceof PositionImpl);
         PositionImpl newReadPosition = (PositionImpl) newReadPositionInt;
 
         lock.writeLock().lock();
         try {
-            if (newReadPosition.compareTo(markDeletePosition) <= 0) {
+            if (!force && newReadPosition.compareTo(markDeletePosition) <= 0) {
                 // Make sure the newReadPosition comes after the mark delete 
position
                 newReadPosition = 
ledger.getNextValidPosition(markDeletePosition);
             }
-
-            PositionImpl oldReadPosition = readPosition;
             readPosition = newReadPosition;
         } finally {
             lock.writeLock().unlock();
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 6b9c009..57e1964 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -175,7 +175,7 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
-        public void seek(Position newReadPosition) {
+        public void seek(Position newReadPosition, boolean force) {
         }
 
         @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 1313413..4375188 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.compaction;
 
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.collect.ComparisonChain;
@@ -122,7 +123,13 @@ public class CompactedTopicImpl implements CompactedTopic {
                                 return readEntries(context.ledger, startPoint, 
endPoint)
                                     .thenAccept((entries) -> {
                                         Entry lastEntry = 
entries.get(entries.size() - 1);
-                                        
cursor.seek(lastEntry.getPosition().getNext());
+                                        // The compaction task depends on the 
last snapshot and the incremental
+                                        // entries to build the new snapshot. 
So for the compaction cursor, we
+                                        // need to force seek the read 
position to ensure the compactor can read
+                                        // the complete last snapshot because 
of the compactor will read the data
+                                        // before the compaction cursor mark 
delete position
+                                        
cursor.seek(lastEntry.getPosition().getNext(),
+                                                
cursor.getName().equals(COMPACTION_SUBSCRIPTION));
                                         callback.readEntriesComplete(entries, 
consumer);
                                     });
                             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 3d41088..adb6f46 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.compaction;
 
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.google.common.collect.Sets;
 
@@ -437,4 +438,48 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
         reader.readNext();
         Assert.assertFalse(reader.hasMessageAvailable());
     }
+
+    @Test
+    public void testDoNotLossTheLastCompactedLedgerData() throws Exception {
+        String topic = 
"persistent://my-property/use/my-ns/testDoNotLossTheLastCompactedLedgerData-" +
+                UUID.randomUUID();
+        final int numMessages = 2000;
+        final int keys = 200;
+        final String msg = "Test";
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .blockIfQueueFull(true)
+                .maxPendingMessages(numMessages)
+                .enableBatching(false)
+                .create();
+        CompletableFuture<MessageId> lastMessage = null;
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key(i % keys + 
"").value(msg).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, keys);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    
.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+        });
+        admin.topics().unload(topic);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topic);
+            Assert.assertEquals(stats.ledgers.size(), 1);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    
.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+        });
+        admin.topics().unload(topic);
+        // Send one more key to and then to trigger the compaction
+        producer.newMessage().key(keys + "").value(msg).send();
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topic);
+            Assert.assertEquals(stats.compactedLedger.entries, keys + 1);
+        });
+    }
 }

Reply via email to