This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new abfc10b6d7d [improve][broker] Avoid record inactiveproducers when
deduplication is disable. (#21193)
abfc10b6d7d is described below
commit abfc10b6d7dce07defb7e295f6cb96124f3288b7
Author: lifepuzzlefun <[email protected]>
AuthorDate: Thu Dec 21 18:24:08 2023 +0800
[improve][broker] Avoid record inactiveproducers when deduplication is
disable. (#21193)
Co-authored-by: Jiwe Guo <[email protected]>
---
.../service/persistent/MessageDeduplication.java | 21 ++++++++++
.../service/persistent/MessageDuplicationTest.java | 49 +++++++++++++++++++++-
2 files changed, 69 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 238dc740509..802dd917961 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -474,6 +474,10 @@ public class MessageDeduplication {
* Topic will call this method whenever a producer connects.
*/
public void producerAdded(String producerName) {
+ if (!isEnabled()) {
+ return;
+ }
+
// Producer is no-longer inactive
inactiveProducers.remove(producerName);
}
@@ -482,6 +486,10 @@ public class MessageDeduplication {
* Topic will call this method whenever a producer disconnects.
*/
public void producerRemoved(String producerName) {
+ if (!isEnabled()) {
+ return;
+ }
+
// Producer is no-longer active
inactiveProducers.put(producerName, System.currentTimeMillis());
}
@@ -493,6 +501,14 @@ public class MessageDeduplication {
long minimumActiveTimestamp = System.currentTimeMillis() -
TimeUnit.MINUTES
.toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
+ // if not enabled just clear all inactive producer record.
+ if (!isEnabled()) {
+ if (!inactiveProducers.isEmpty()) {
+ inactiveProducers.clear();
+ }
+ return;
+ }
+
Iterator<Map.Entry<String, Long>> mapIterator =
inactiveProducers.entrySet().iterator();
boolean hasInactive = false;
while (mapIterator.hasNext()) {
@@ -545,5 +561,10 @@ public class MessageDeduplication {
return managedCursor;
}
+ @VisibleForTesting
+ Map<String, Long> getInactiveProducers() {
+ return inactiveProducers;
+ }
+
private static final Logger log =
LoggerFactory.getLogger(MessageDeduplication.class);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 19583a4455e..72eb3e8101f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import java.lang.reflect.Field;
@@ -47,15 +48,22 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker")
-public class MessageDuplicationTest {
+public class MessageDuplicationTest extends BrokerTestBase {
private static final int BROKER_DEDUPLICATION_ENTRIES_INTERVAL = 10;
private static final int BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS = 10;
@@ -438,4 +446,43 @@ public class MessageDuplicationTest {
}
});
}
+
+ @BeforeMethod(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ this.conf.setBrokerDeduplicationEnabled(true);
+ super.baseSetup();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testMessageDeduplication() throws Exception {
+ String topicName = "persistent://prop/ns-abc/testMessageDeduplication";
+ String producerName = "test-producer";
+ Producer<String> producer = pulsarClient
+ .newProducer(Schema.STRING)
+ .producerName(producerName)
+ .topic(topicName)
+ .create();
+ final PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService()
+ .getTopicIfExists(topicName).get().orElse(null);
+ assertNotNull(persistentTopic);
+ final MessageDeduplication messageDeduplication =
persistentTopic.getMessageDeduplication();
+
assertFalse(messageDeduplication.getInactiveProducers().containsKey(producerName));
+ producer.close();
+ Awaitility.await().untilAsserted(() ->
assertTrue(messageDeduplication.getInactiveProducers().containsKey(producerName)));
+ admin.topicPolicies().setDeduplicationStatus(topicName, false);
+ Awaitility.await().untilAsserted(() -> {
+ final Boolean deduplicationStatus =
admin.topicPolicies().getDeduplicationStatus(topicName);
+ Assert.assertNotNull(deduplicationStatus);
+ Assert.assertFalse(deduplicationStatus);
+ });
+ messageDeduplication.purgeInactiveProducers();
+ assertTrue(messageDeduplication.getInactiveProducers().isEmpty());
+ }
}