This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 3a30526ac93 [fix][broker] Fix write duplicate entries into the
compacted ledger after RawReader reconnects (#21081) (#21165)
3a30526ac93 is described below
commit 3a30526ac936101d00eec6c053a6119c091be382
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Sep 18 18:15:14 2023 +0800
[fix][broker] Fix write duplicate entries into the compacted ledger after
RawReader reconnects (#21081) (#21165)
---
.../pulsar/compaction/TwoPhaseCompactor.java | 13 ++++-
.../apache/pulsar/compaction/CompactionTest.java | 66 ++++++++++++++++++++++
2 files changed, 76 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 886a523df02..be08bd81c1e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -207,7 +207,7 @@ public class TwoPhaseCompactor extends Compactor {
reader.seekAsync(from).thenCompose((v) -> {
Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
CompletableFuture<Void> loopPromise = new CompletableFuture<>();
- phaseTwoLoop(reader, to, latestForKey, ledger, outstanding,
loopPromise);
+ phaseTwoLoop(reader, to, latestForKey, ledger, outstanding,
loopPromise, MessageId.earliest);
return loopPromise;
}).thenCompose((v) -> closeLedger(ledger))
.thenCompose((v) ->
reader.acknowledgeCumulativeAsync(lastReadId,
@@ -229,7 +229,8 @@ public class TwoPhaseCompactor extends Compactor {
}
private void phaseTwoLoop(RawReader reader, MessageId to, Map<String,
MessageId> latestForKey,
- LedgerHandle lh, Semaphore outstanding,
CompletableFuture<Void> promise) {
+ LedgerHandle lh, Semaphore outstanding,
CompletableFuture<Void> promise,
+ MessageId lastCompactedMessageId) {
if (promise.isDone()) {
return;
}
@@ -238,6 +239,12 @@ public class TwoPhaseCompactor extends Compactor {
m.close();
return;
}
+
+ if (m.getMessageId().compareTo(lastCompactedMessageId) <= 0) {
+ phaseTwoLoop(reader, to, latestForKey, lh, outstanding,
promise, lastCompactedMessageId);
+ return;
+ }
+
try {
MessageId id = m.getMessageId();
Optional<RawMessage> messageToAdd = Optional.empty();
@@ -308,7 +315,7 @@ public class TwoPhaseCompactor extends Compactor {
}
return;
}
- phaseTwoLoop(reader, to, latestForKey, lh, outstanding,
promise);
+ phaseTwoLoop(reader, to, latestForKey, lh, outstanding,
promise, m.getMessageId());
} finally {
m.close();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index f125022c187..535d6715a30 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -1870,4 +1870,70 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
consumer.close();
producer.close();
}
+
+ @Test
+ public void testCompactionDuplicate() throws Exception {
+ String topic =
"persistent://my-property/use/my-ns/testCompactionDuplicate";
+ final int numMessages = 1000;
+ final int maxKeys = 800;
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ // trigger compaction (create __compaction cursor)
+ admin.topics().triggerCompaction(topic);
+
+ Map<String, byte[]> expected = new HashMap<>();
+ Random r = new Random(0);
+
+
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+ for (int j = 0; j < numMessages; j++) {
+ int keyIndex = r.nextInt(maxKeys);
+ String key = "key" + keyIndex;
+ byte[] data = ("my-message-" + key + "-" + j).getBytes();
+ producer.newMessage().key(key).value(data).send();
+ expected.put(key, data);
+ }
+
+ producer.flush();
+
+ // trigger compaction
+ admin.topics().triggerCompaction(topic);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin.topics().compactionStatus(topic).status,
+ LongRunningProcessStatus.Status.RUNNING);
+ });
+
+ // Wait for phase one to complete
+ Thread.sleep(500);
+
+ // Unload topic make reader of compaction reconnect
+ admin.topics().unload(topic);
+
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topic, false);
+ // Compacted topic ledger should have same number of entry equals
to number of unique key.
+ Assert.assertEquals(internalStats.compactedLedger.entries,
expected.size());
+ Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
+ Assert.assertFalse(internalStats.compactedLedger.offloaded);
+ });
+
+ // consumer with readCompacted enabled only get compacted entries
+ try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe()) {
+ while (true) {
+ Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
+ Assert.assertEquals(expected.remove(m.getKey()), m.getData());
+ if (expected.isEmpty()) {
+ break;
+ }
+ }
+ }
+ }
}