This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 3a30526ac93 [fix][broker] Fix write duplicate entries into the 
compacted ledger after RawReader reconnects (#21081) (#21165)
3a30526ac93 is described below

commit 3a30526ac936101d00eec6c053a6119c091be382
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Sep 18 18:15:14 2023 +0800

    [fix][broker] Fix write duplicate entries into the compacted ledger after 
RawReader reconnects (#21081) (#21165)
---
 .../pulsar/compaction/TwoPhaseCompactor.java       | 13 ++++-
 .../apache/pulsar/compaction/CompactionTest.java   | 66 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 886a523df02..be08bd81c1e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -207,7 +207,7 @@ public class TwoPhaseCompactor extends Compactor {
         reader.seekAsync(from).thenCompose((v) -> {
             Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
             CompletableFuture<Void> loopPromise = new CompletableFuture<>();
-            phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, 
loopPromise);
+            phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, 
loopPromise, MessageId.earliest);
             return loopPromise;
         }).thenCompose((v) -> closeLedger(ledger))
                 .thenCompose((v) -> 
reader.acknowledgeCumulativeAsync(lastReadId,
@@ -229,7 +229,8 @@ public class TwoPhaseCompactor extends Compactor {
     }
 
     private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, 
MessageId> latestForKey,
-                              LedgerHandle lh, Semaphore outstanding, 
CompletableFuture<Void> promise) {
+                              LedgerHandle lh, Semaphore outstanding, 
CompletableFuture<Void> promise,
+                              MessageId lastCompactedMessageId) {
         if (promise.isDone()) {
             return;
         }
@@ -238,6 +239,12 @@ public class TwoPhaseCompactor extends Compactor {
                 m.close();
                 return;
             }
+
+            if (m.getMessageId().compareTo(lastCompactedMessageId) <= 0) {
+                phaseTwoLoop(reader, to, latestForKey, lh, outstanding, 
promise, lastCompactedMessageId);
+                return;
+            }
+
             try {
                 MessageId id = m.getMessageId();
                 Optional<RawMessage> messageToAdd = Optional.empty();
@@ -308,7 +315,7 @@ public class TwoPhaseCompactor extends Compactor {
                     }
                     return;
                 }
-                phaseTwoLoop(reader, to, latestForKey, lh, outstanding, 
promise);
+                phaseTwoLoop(reader, to, latestForKey, lh, outstanding, 
promise, m.getMessageId());
             } finally {
                 m.close();
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index f125022c187..535d6715a30 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -1870,4 +1870,70 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         consumer.close();
         producer.close();
     }
+
+    @Test
+    public void testCompactionDuplicate() throws Exception {
+        String topic = 
"persistent://my-property/use/my-ns/testCompactionDuplicate";
+        final int numMessages = 1000;
+        final int maxKeys = 800;
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
+
+        // trigger compaction (create __compaction cursor)
+        admin.topics().triggerCompaction(topic);
+
+        Map<String, byte[]> expected = new HashMap<>();
+        Random r = new Random(0);
+
+        
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+        for (int j = 0; j < numMessages; j++) {
+            int keyIndex = r.nextInt(maxKeys);
+            String key = "key" + keyIndex;
+            byte[] data = ("my-message-" + key + "-" + j).getBytes();
+            producer.newMessage().key(key).value(data).send();
+            expected.put(key, data);
+        }
+
+        producer.flush();
+
+        // trigger compaction
+        admin.topics().triggerCompaction(topic);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(admin.topics().compactionStatus(topic).status,
+                    LongRunningProcessStatus.Status.RUNNING);
+        });
+
+        // Wait for phase one to complete
+        Thread.sleep(500);
+
+        // Unload topic make reader of compaction reconnect
+        admin.topics().unload(topic);
+
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(topic, false);
+            // Compacted topic ledger should have same number of entry equals 
to number of unique key.
+            Assert.assertEquals(internalStats.compactedLedger.entries, 
expected.size());
+            Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
+            Assert.assertFalse(internalStats.compactedLedger.offloaded);
+        });
+
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            while (true) {
+                Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
+                Assert.assertEquals(expected.remove(m.getKey()), m.getData());
+                if (expected.isEmpty()) {
+                    break;
+                }
+            }
+        }
+    }
 }

Reply via email to