This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 85757732f6c46a9b56fa056efbdafc1c0be97c10
Author: lipenghui <[email protected]>
AuthorDate: Wed Nov 10 15:58:51 2021 +0800

    ### Motivation (#12698)
    
    Fix lost compaction data due to compaction properties missed during 
reset-cursor.
    
    1. The compaction reader will seek to the earliest position to read data 
from the topic, but the compaction properties missed during the cursor reset, 
this will lead to the inited compaction subscribe without compaction horizon, 
so the compaction reader will skip the last compacted data. It will only happen 
when init the compaction subscription, so can introduced by the loadbalance or 
topic unloading manually.
    
    2. Advance the cursor should also keep the properties, otherwise, the 
properties will lost during the cursor trimming.
    
    ### Changes
    
    1. Keep the properties for resetting the cursor while the cursor is for 
data compaction.
    2. Copy the properties to the new mark delete entry while advance the 
cursor, this is triggered byt the managed ledger internal, so it's not only for 
compacted topic, the internal task should not loss the properties when trimming 
the cursor.
    
    ### Tests
    
    New tests added to make sure the compaction will not loss data during topic 
unloading and the reader can read all the compacted data after the compaction 
task complete
    
    (cherry picked from commit 98e2c66a7b2d5fd42641527cd9ad6c3b497d65c7)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 10 +++-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  2 +-
 .../broker/intercept/CounterBrokerInterceptor.java |  8 +--
 .../pulsar/compaction/CompactedTopicTest.java      | 70 +++++++++++++++++++++-
 4 files changed, 82 insertions(+), 8 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index dd715ff..f13fa93 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -194,6 +194,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     private long entriesReadCount;
     private long entriesReadSize;
     private int individualDeletedMessagesSerializedSize;
+    private static final String COMPACTION_CURSOR_NAME = "__compaction";
 
     class MarkDeleteEntry {
         final PositionImpl newPosition;
@@ -1068,7 +1069,8 @@ public class ManagedCursorImpl implements ManagedCursor {
                                 Range.closedOpen(markDeletePosition, 
newMarkDeletePosition)));
                     }
                     markDeletePosition = newMarkDeletePosition;
-                    lastMarkDeleteEntry = new 
MarkDeleteEntry(newMarkDeletePosition, Collections.emptyMap(),
+                    lastMarkDeleteEntry = new 
MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor() ?
+                            getProperties() : Collections.emptyMap(),
                             null, null);
                     individualDeletedMessages.clear();
                     if (config.isDeletionAtBatchIndexLevelEnabled() && 
batchDeletedIndexes != null) {
@@ -1118,7 +1120,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         };
 
-        internalAsyncMarkDelete(newPosition, Collections.emptyMap(), new 
MarkDeleteCallback() {
+        internalAsyncMarkDelete(newPosition, isCompactionCursor() ? 
getProperties() : Collections.emptyMap(), new MarkDeleteCallback() {
             @Override
             public void markDeleteComplete(Object ctx) {
                 finalCallback.operationComplete();
@@ -3066,5 +3068,9 @@ public class ManagedCursorImpl implements ManagedCursor {
         return isReadPositionOnTail || isReadPositionChanged;
     }
 
+    private boolean isCompactionCursor() {
+        return COMPACTION_CURSOR_NAME.equals(name);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorImpl.class);
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a59ce22..caff296 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2550,7 +2550,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             if (highestPositionToDelete.compareTo((PositionImpl) 
cursor.getMarkDeletedPosition()) > 0
                     && highestPositionToDelete.compareTo((PositionImpl) 
cursor.getManagedLedger().getLastConfirmedEntry()) <= 0
                     && !(!cursor.isDurable() && cursor instanceof 
NonDurableCursorImpl && ((NonDurableCursorImpl) cursor).isReadCompacted())) {
-                cursor.asyncMarkDelete(highestPositionToDelete, new 
MarkDeleteCallback() {
+                cursor.asyncMarkDelete(highestPositionToDelete, 
cursor.getProperties(), new MarkDeleteCallback() {
                     @Override
                     public void markDeleteComplete(Object ctx) {
                     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index dc51c3d..1462cfa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -56,14 +56,14 @@ public class CounterBrokerInterceptor implements 
BrokerInterceptor {
                                   Entry entry,
                                   long[] ackSet,
                                   MessageMetadata msgMetadata) {
-        log.info("Send message to topic {}, subscription {}",
+        log.debug("Send message to topic {}, subscription {}",
             subscription.getTopic(), subscription.getName());
         beforeSendCount++;
     }
 
     @Override
     public void onPulsarCommand(BaseCommand command, ServerCnx cnx) {
-        log.info("[{}] On [{}] Pulsar command", count, 
command.getType().name());
+        log.debug("[{}] On [{}] Pulsar command", count, 
command.getType().name());
         count ++;
     }
 
@@ -75,13 +75,13 @@ public class CounterBrokerInterceptor implements 
BrokerInterceptor {
     @Override
     public void onWebserviceRequest(ServletRequest request) {
         count ++;
-        log.info("[{}] On [{}] Webservice request", count, 
((HttpServletRequest)request).getRequestURL().toString());
+        log.debug("[{}] On [{}] Webservice request", count, 
((HttpServletRequest)request).getRequestURL().toString());
     }
 
     @Override
     public void onWebserviceResponse(ServletRequest request, ServletResponse 
response) {
         count ++;
-        log.info("[{}] On [{}] Webservice response {}", count, 
((HttpServletRequest)request).getRequestURL().toString(), response);
+        log.debug("[{}] On [{}] Webservice response {}", count, 
((HttpServletRequest)request).getRequestURL().toString(), response);
         if (response instanceof Response) {
             Response res = (Response) response;
             responseList.add(new 
ResponseEvent(res.getHttpChannel().getRequest().getRequestURI(), 
res.getStatus()));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index cbe7372..69d66d9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -33,6 +33,7 @@ import java.util.Optional;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.IntStream;
 
@@ -577,9 +578,76 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
         // The reader should read all 600 keys
         int received = 0;
         while (reader.hasMessageAvailable()) {
-            System.out.println(reader.readNext().getKey());
+            reader.readNext();
             received++;
         }
         Assert.assertEquals(received, keys * 3);
+        reader.close();
+        producer.close();
+    }
+
+    @Test(timeOut = 120000)
+    public void testCompactionWithTopicUnloading() throws Exception {
+        String topic = 
"persistent://my-property/use/my-ns/testCompactionWithTopicUnloading-" +
+                UUID.randomUUID();
+        final int numMessages = 2000;
+        final int keys = 500;
+        final String msg = "Test";
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .blockIfQueueFull(true)
+                .maxPendingMessages(numMessages)
+                .enableBatching(false)
+                .create();
+        CompletableFuture<MessageId> lastMessage = null;
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key(i % keys + 
"").value(msg).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().pollInterval(5, TimeUnit.SECONDS).untilAsserted(() 
-> {
+            PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, keys);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    
.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+        });
+
+        admin.topics().unload(topic);
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key((i % keys + keys) + 
"").value(msg).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+        admin.topics().triggerCompaction(topic);
+        Thread.sleep(100);
+        admin.topics().unload(topic);
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().pollInterval(3, TimeUnit.SECONDS).atMost(30, 
TimeUnit.SECONDS).untilAsserted(() -> {
+            PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, keys * 2);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    
.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+        });
+
+        // Start a new reader to reading messages
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .receiverQueueSize(10)
+                .create();
+
+        // The reader should read all 600 keys
+        int received = 0;
+        while (reader.hasMessageAvailable()) {
+            reader.readNext();
+            received++;
+        }
+        Assert.assertEquals(received, keys * 2);
+        reader.close();
+        producer.close();
     }
 }

Reply via email to