This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4349e9f7896 [fix][broker] fix MessageDeduplication throw NPE when
enable broker dedup and set namespace disable deduplication. (#20905)
4349e9f7896 is described below
commit 4349e9f789692761a39c999dad44f9af33534208
Author: lifepuzzlefun <[email protected]>
AuthorDate: Tue Aug 1 13:16:25 2023 +0800
[fix][broker] fix MessageDeduplication throw NPE when enable broker dedup
and set namespace disable deduplication. (#20905)
---
.../service/persistent/MessageDeduplication.java | 4 +++
.../service/persistent/TopicDuplicationTest.java | 37 ++++++++++++++++++++++
2 files changed, 41 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index ed4e70bfd29..490be4a8876 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -482,6 +482,10 @@ public class MessageDeduplication {
}
public void takeSnapshot() {
+ if (!isEnabled()) {
+ return;
+ }
+
Integer interval =
topic.getHierarchyTopicPolicies().getDeduplicationSnapshotIntervalSeconds().get();
long currentTimeStamp = System.currentTimeMillis();
if (interval == null || interval <= 0
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
index e57092d02dd..16721ca1203 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
@@ -492,6 +492,43 @@ public class TopicDuplicationTest extends
ProducerConsumerBase {
}
+ @Test(timeOut = 30000)
+ public void
testDisableNamespacePolicyTakeSnapshotShouldNotThrowException() throws
Exception {
+ cleanup();
+ conf.setBrokerDeduplicationEnabled(true);
+ conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
+ conf.setBrokerDeduplicationSnapshotIntervalSeconds(1);
+ conf.setBrokerDeduplicationEntriesInterval(20000);
+ setup();
+
+ final String topicName = testTopic + UUID.randomUUID().toString();
+ final String producerName = "my-producer";
+ @Cleanup
+ Producer<String> producer = pulsarClient
+
.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName(producerName).create();
+
+ // disable deduplication
+ admin.namespaces().setDeduplicationStatus(myNamespace, false);
+
+ int msgNum = 50;
+ CountDownLatch countDownLatch = new CountDownLatch(msgNum);
+ for (int i = 0; i < msgNum; i++) {
+ producer.newMessage().value("msg" +
i).sendAsync().whenComplete((res, e) -> countDownLatch.countDown());
+ }
+ countDownLatch.await();
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService()
+ .getTopicIfExists(topicName).get().get();
+ ManagedCursor managedCursor =
persistentTopic.getMessageDeduplication().getManagedCursor();
+
+ // when disable topic deduplication the cursor should be deleted.
+ assertNull(managedCursor);
+
+ // this method will be called at brokerService forEachTopic.
+ // if topic level disable deduplication.
+ // this method should be skipped without throw exception.
+ persistentTopic.checkDeduplicationSnapshot();
+ }
+
private void waitCacheInit(String topicName) throws Exception {
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close();
TopicName topic = TopicName.get(topicName);