This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 40d32542e06 [fix][broker] Prevent dedup recovery race from allowing
duplicate messages (#25953)
40d32542e06 is described below
commit 40d32542e06507e752a8725f03dbfb3de25f086c
Author: void-ptr974 <[email protected]>
AuthorDate: Mon Jun 8 07:04:35 2026 +0800
[fix][broker] Prevent dedup recovery race from allowing duplicate messages
(#25953)
---
.../service/persistent/MessageDeduplication.java | 35 +++++++++---
.../broker/BrokerMessageDeduplicationTest.java | 64 ++++++++++++++++++++++
2 files changed, 91 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 2db01ccc946..a980556f49b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -104,6 +104,7 @@ public class MessageDeduplication {
private volatile Status status;
+ private CompletableFuture<Void> statusChangeFuture =
CompletableFuture.completedFuture(null);
// Map that contains the highest sequenceId that have been sent by each
producers. The map will be updated before
// the messages are persisted
@@ -159,7 +160,10 @@ public class MessageDeduplication {
public CompletableFuture<Void> checkStatus() {
boolean shouldBeEnabled = topic.isDeduplicationEnabled();
synchronized (this) {
- if (status == Status.Recovering || status == Status.Removing) {
+ if (status == Status.Recovering) {
+ return statusChangeFuture;
+ }
+ if (status == Status.Removing) {
// If there's already a transition happening, check later for
status
pulsar.getExecutor().schedule(this::checkStatus, 1,
TimeUnit.MINUTES);
return CompletableFuture.completedFuture(null);
@@ -224,17 +228,27 @@ public class MessageDeduplication {
}, null);
return future;
- } else if ((status == Status.Disabled || status ==
Status.Initialized) && shouldBeEnabled) {
+ } else if ((status == Status.Disabled || status ==
Status.Initialized || status == Status.Failed)
+ && shouldBeEnabled) {
// Enable deduping
- final var future = openCursor(managedLedger,
PersistentTopic.DEDUPLICATION_CURSOR_NAME)
- .thenCompose(this::replayCursor);
- future.exceptionally(e -> {
+ status = Status.Recovering;
+ final CompletableFuture<Void> future;
+ try {
+ future = openCursor(managedLedger,
PersistentTopic.DEDUPLICATION_CURSOR_NAME)
+ .thenCompose(this::replayCursor);
+ } catch (Throwable e) {
status = Status.Failed;
+ statusChangeFuture = CompletableFuture.failedFuture(e);
log.error().exception(e).log("Failed to enable
deduplication");
- future.completeExceptionally(e);
- return null;
+ return statusChangeFuture;
+ }
+ statusChangeFuture = future.whenComplete((__, e) -> {
+ if (e != null) {
+ status = Status.Failed;
+ log.error().exception(e).log("Failed to enable
deduplication");
+ }
});
- return future;
+ return statusChangeFuture;
} else {
// Nothing to do, we are in the correct state
return CompletableFuture.completedFuture(null);
@@ -244,6 +258,11 @@ public class MessageDeduplication {
private CompletableFuture<Void> replayCursor(ManagedCursor cursor) {
managedCursor = cursor;
+ cursor.rewind();
+ snapshotCounter = 0;
+ highestSequencedPushed.clear();
+ highestSequencedPersisted.clear();
+ inactiveProducers.clear();
// Load the sequence ids from the snapshot in the cursor properties
managedCursor.getProperties().forEach((k, v) -> {
producerRemoved(k);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java
index c83803d9b9f..f36caf87e6c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java
@@ -25,7 +25,10 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
@@ -35,6 +38,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -129,5 +134,64 @@ public class BrokerMessageDeduplicationTest {
assertTrue(e.getCause() instanceof RuntimeException);
assertTrue(e.getMessage().contains("asyncReadEntries failed"));
}
+ assertEquals(String.valueOf(deduplication.getStatus()), "Failed");
+ }
+
+ @Test
+ public void checkStatusDoesNotStartMultipleRecoveries() throws Exception {
+ final var cursor = mock(ManagedCursor.class);
+ final var openCursorCallback = new
AtomicReference<AsyncCallbacks.OpenCursorCallback>();
+ doAnswer(invocation -> {
+ openCursorCallback.set(invocation.getArgument(1));
+ return null;
+ }).when(managedLedger).asyncOpenCursor(any(), any(), any());
+ doReturn(Map.of()).when(cursor).getProperties();
+ doReturn(false).when(cursor).hasMoreEntries();
+
+ final var firstCheckStatus = deduplication.checkStatus();
+ assertFalse(firstCheckStatus.isDone());
+ assertEquals(String.valueOf(deduplication.getStatus()), "Recovering");
+
+ final var secondCheckStatus = deduplication.checkStatus();
+ assertFalse(secondCheckStatus.isDone());
+ verify(managedLedger, times(1)).asyncOpenCursor(any(), any(), any());
+
+ openCursorCallback.get().openCursorComplete(cursor, null);
+ firstCheckStatus.get(3, TimeUnit.SECONDS);
+ secondCheckStatus.get(3, TimeUnit.SECONDS);
+ assertEquals(String.valueOf(deduplication.getStatus()), "Enabled");
+ verify(managedLedger, times(1)).asyncOpenCursor(any(), any(), any());
+ }
+
+ @Test
+ public void checkStatusRetriesAfterFailedEnable() throws Exception {
+ final var cursor = mock(ManagedCursor.class);
+ doAnswer(invocation -> {
+ ((AsyncCallbacks.OpenCursorCallback)
invocation.getArgument(1)).openCursorComplete(cursor, null);
+ return null;
+ }).when(managedLedger).asyncOpenCursor(any(), any(), any());
+ doReturn(Map.of("from-snapshot", 10L)).when(cursor).getProperties();
+
+ final var hasMoreEntriesCalls = new AtomicInteger();
+ doAnswer(invocation -> hasMoreEntriesCalls.getAndIncrement() ==
0).when(cursor).hasMoreEntries();
+ doAnswer(invocation -> {
+ throw new RuntimeException("asyncReadEntries failed");
+ }).when(cursor).asyncReadEntries(anyInt(), anyLong(), any(), any(),
any());
+
+ try {
+ deduplication.checkStatus().get(3, TimeUnit.SECONDS);
+ fail();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof RuntimeException);
+ assertTrue(e.getMessage().contains("asyncReadEntries failed"));
+ }
+ assertEquals(String.valueOf(deduplication.getStatus()), "Failed");
+
+ final var retry = deduplication.checkStatus();
+ retry.get(3, TimeUnit.SECONDS);
+ assertEquals(String.valueOf(deduplication.getStatus()), "Enabled");
+
assertEquals(deduplication.getLastPublishedSequenceId("from-snapshot"), 10L);
+ verify(cursor, times(2)).rewind();
+ verify(managedLedger, times(2)).asyncOpenCursor(any(), any(), any());
}
}