This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new b116c11e87a [fix][broker] Fix MessageDeduplication replay timeout
cause topic loading stuck (#23004)
b116c11e87a is described below
commit b116c11e87ae1cfc30fa2b4ca28f20c84ddae5b4
Author: ken <[email protected]>
AuthorDate: Sat Jul 6 06:26:28 2024 +0800
[fix][broker] Fix MessageDeduplication replay timeout cause topic loading
stuck (#23004)
Co-authored-by: fanjianye <[email protected]>
(cherry picked from commit 41ef3f6fb1c0b209307d7b4e14300a377c52c5ab)
---
.../service/persistent/MessageDeduplication.java | 14 ++-
.../service/persistent/TopicDuplicationTest.java | 103 +++++++++++++++++++++
2 files changed, 113 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index ab3b799093b..1715e09dc7b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -159,11 +159,12 @@ public class MessageDeduplication {
log.info("[{}] Replaying {} entries for deduplication",
topic.getName(), managedCursor.getNumberOfEntries());
CompletableFuture<Position> future = new CompletableFuture<>();
replayCursor(future);
- return future.thenAccept(lastPosition -> {
+ return future.thenCompose(lastPosition -> {
if (lastPosition != null && snapshotCounter >= snapshotInterval) {
snapshotCounter = 0;
- takeSnapshot(lastPosition);
+ return takeSnapshot(lastPosition);
}
+ return CompletableFuture.completedFuture(null);
});
}
@@ -438,13 +439,15 @@ public class MessageDeduplication {
}
}
- private void takeSnapshot(Position position) {
+ private CompletableFuture<Void> takeSnapshot(Position position) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
if (log.isDebugEnabled()) {
log.debug("[{}] Taking snapshot of sequence ids map",
topic.getName());
}
if (!snapshotTaking.compareAndSet(false, true)) {
- return;
+ future.complete(null);
+ return future;
}
Map<String, Long> snapshot = new TreeMap<>();
@@ -462,14 +465,17 @@ public class MessageDeduplication {
}
lastSnapshotTimestamp = System.currentTimeMillis();
snapshotTaking.set(false);
+ future.complete(null);
}
@Override
public void markDeleteFailed(ManagedLedgerException exception,
Object ctx) {
log.warn("[{}] Failed to store new deduplication snapshot at
{}", topic.getName(), position);
snapshotTaking.set(false);
+ future.completeExceptionally(exception);
}
}, null);
+ return future;
}
/**
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
index 16721ca1203..7069a843e98 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import static
org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
@@ -25,6 +26,7 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import java.lang.reflect.Field;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -33,12 +35,18 @@ import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -529,6 +537,101 @@ public class TopicDuplicationTest extends
ProducerConsumerBase {
persistentTopic.checkDeduplicationSnapshot();
}
+ @Test
+ public void testFinishTakeSnapshotWhenTopicLoading() throws Exception {
+ cleanup();
+ setup();
+
+ // Create a topic and wait deduplication is started.
+ int brokerDeduplicationEntriesInterval = 1000;
+ pulsar.getConfiguration().setBrokerDeduplicationEnabled(true);
+
pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(brokerDeduplicationEntriesInterval);
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createNonPartitionedTopic(topic);
+ final PersistentTopic persistentTopic1 =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(topic,
false).join().get();
+ final ManagedLedgerImpl ml1 = (ManagedLedgerImpl)
persistentTopic1.getManagedLedger();
+ Awaitility.await().untilAsserted(() -> {
+ ManagedCursorImpl cursor1 =
+ (ManagedCursorImpl)
ml1.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
+ assertNotNull(cursor1);
+ });
+ final MessageDeduplication deduplication1 =
persistentTopic1.getMessageDeduplication();
+
+
+ // Send 999 messages, it is less than
"brokerDeduplicationEntriesInterval".
+ // So it would not trigger takeSnapshot
+ final Producer<String> producer =
pulsarClient.newProducer(Schema.STRING)
+ .topic(topic).enableBatching(false).create();
+ for (int i = 0; i < brokerDeduplicationEntriesInterval - 1; i++) {
+ producer.send(i + "");
+ }
+ producer.close();
+ int snapshotCounter1 = WhiteboxImpl.getInternalState(deduplication1,
"snapshotCounter");
+ assertEquals(snapshotCounter1, brokerDeduplicationEntriesInterval - 1);
+
+
+ // Unload and load topic, simulate topic load is timeout.
+ // SetBrokerDeduplicationEntriesInterval to 10, therefore
recoverSequenceIdsMap#takeSnapshot
+ // would trigger and should update the snapshot position.
+ // However, if topic close and takeSnapshot are concurrent,
+ // it would result in takeSnapshot throw exception
+ admin.topics().unload(topic);
+ pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(10);
+
+ // Mock message deduplication recovery speed topicLoadTimeoutSeconds
+ pulsar.getConfiguration().setTopicLoadTimeoutSeconds(1);
+ String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" +
+ TopicName.get(topic).getPersistenceNamingEncoding() + "/" +
DEDUPLICATION_CURSOR_NAME;
+ mockZooKeeper.delay(2 * 1000, (op, path) -> {
+ if (mlPath.equals(path)) {
+ return true;
+ }
+ return false;
+ });
+
+ Field field2 = BrokerService.class.getDeclaredField("topics");
+ field2.setAccessible(true);
+ ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>
topics =
+ (ConcurrentOpenHashMap<String,
CompletableFuture<Optional<Topic>>>)
+ field2.get(pulsar.getBrokerService());
+
+ try {
+ pulsar.getBrokerService().getTopic(topic, false).join().get();
+ Assert.fail();
+ } catch (Exception e) {
+ // topic loading should timeout.
+ }
+ Awaitility.await().untilAsserted(() -> {
+ // topic loading timeout then close topic and remove from topicsMap
+ Assert.assertFalse(topics.containsKey(topic));
+ });
+
+
+ // Load topic again, setBrokerDeduplicationEntriesInterval to 10000,
+ // make recoverSequenceIdsMap#takeSnapshot not trigger takeSnapshot.
+ // But actually it should not replay again in recoverSequenceIdsMap,
+ // since previous topic loading should finish the replay process.
+ pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(10000);
+ pulsar.getConfiguration().setTopicLoadTimeoutSeconds(60);
+ PersistentTopic persistentTopic2 =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(topic,
false).join().get();
+ ManagedLedgerImpl ml2 = (ManagedLedgerImpl)
persistentTopic2.getManagedLedger();
+ MessageDeduplication deduplication2 =
persistentTopic2.getMessageDeduplication();
+
+ Awaitility.await().untilAsserted(() -> {
+ int snapshotCounter3 =
WhiteboxImpl.getInternalState(deduplication2, "snapshotCounter");
+ Assert.assertEquals(snapshotCounter3, 0);
+ Assert.assertEquals(ml2.getLedgersInfo().size(), 1);
+ });
+
+
+ // cleanup.
+ admin.topics().delete(topic);
+ cleanup();
+ setup();
+ }
+
private void waitCacheInit(String topicName) throws Exception {
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close();
TopicName topic = TopicName.get(topicName);