This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 89e005f939fa4400bc1be5a9058264d9e071622a Author: Jiwei Guo <[email protected]> AuthorDate: Wed Jun 1 11:09:01 2022 +0800 Fix NPE in MessageDeduplication. (#15820) (cherry picked from commit 01d7bfa681b23d1a236b1411b83e854c9ad9323f) --- .../pulsar/broker/service/persistent/MessageDeduplication.java | 2 +- .../pulsar/broker/service/persistent/MessageDuplicationTest.java | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 55d35201fe5..5b29a4f15c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -490,7 +490,7 @@ public class MessageDeduplication { hasInactive = true; } } - if (hasInactive) { + if (hasInactive && isEnabled()) { takeSnapshot(getManagedCursor().getMarkDeletedPosition()); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java index 76caccadcc9..0e1b37b5160 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java @@ -202,6 +202,13 @@ public class MessageDuplicationTest { messageDeduplication.purgeInactiveProducers(); assertEquals(inactiveProducers.size(), 3); + doReturn(false).when(messageDeduplication).isEnabled(); + inactiveProducers.put(producerName2, System.currentTimeMillis() - 80000); + inactiveProducers.put(producerName3, System.currentTimeMillis() - 80000); + messageDeduplication.purgeInactiveProducers(); + assertFalse(inactiveProducers.containsKey(producerName2)); + assertFalse(inactiveProducers.containsKey(producerName3)); + doReturn(true).when(messageDeduplication).isEnabled(); // Modify the inactive time of produce2 and produce3 // messageDeduplication.purgeInactiveProducers() will remove producer2 and producer3 inactiveProducers.put(producerName2, System.currentTimeMillis() - 70000);
