This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new abfc10b6d7d [improve][broker] Avoid record inactiveproducers when 
deduplication is disable. (#21193)
abfc10b6d7d is described below

commit abfc10b6d7dce07defb7e295f6cb96124f3288b7
Author: lifepuzzlefun <[email protected]>
AuthorDate: Thu Dec 21 18:24:08 2023 +0800

    [improve][broker] Avoid record inactiveproducers when deduplication is 
disable. (#21193)
    
    Co-authored-by: Jiwe Guo <[email protected]>
---
 .../service/persistent/MessageDeduplication.java   | 21 ++++++++++
 .../service/persistent/MessageDuplicationTest.java | 49 +++++++++++++++++++++-
 2 files changed, 69 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 238dc740509..802dd917961 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -474,6 +474,10 @@ public class MessageDeduplication {
      * Topic will call this method whenever a producer connects.
      */
     public void producerAdded(String producerName) {
+        if (!isEnabled()) {
+            return;
+        }
+
         // Producer is no-longer inactive
         inactiveProducers.remove(producerName);
     }
@@ -482,6 +486,10 @@ public class MessageDeduplication {
      * Topic will call this method whenever a producer disconnects.
      */
     public void producerRemoved(String producerName) {
+        if (!isEnabled()) {
+            return;
+        }
+
         // Producer is no-longer active
         inactiveProducers.put(producerName, System.currentTimeMillis());
     }
@@ -493,6 +501,14 @@ public class MessageDeduplication {
         long minimumActiveTimestamp = System.currentTimeMillis() - 
TimeUnit.MINUTES
                 
.toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
 
+        // if not enabled just clear all inactive producer record.
+        if (!isEnabled()) {
+            if (!inactiveProducers.isEmpty()) {
+                inactiveProducers.clear();
+            }
+            return;
+        }
+
         Iterator<Map.Entry<String, Long>> mapIterator = 
inactiveProducers.entrySet().iterator();
         boolean hasInactive = false;
         while (mapIterator.hasNext()) {
@@ -545,5 +561,10 @@ public class MessageDeduplication {
         return managedCursor;
     }
 
+    @VisibleForTesting
+    Map<String, Long> getInactiveProducers() {
+        return inactiveProducers;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(MessageDeduplication.class);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 19583a4455e..72eb3e8101f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 import java.lang.reflect.Field;
@@ -47,15 +48,22 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.service.BacklogQuotaManager;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Slf4j
 @Test(groups = "broker")
-public class MessageDuplicationTest {
+public class MessageDuplicationTest extends BrokerTestBase {
 
     private static final int BROKER_DEDUPLICATION_ENTRIES_INTERVAL = 10;
     private static final int BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS = 10;
@@ -438,4 +446,43 @@ public class MessageDuplicationTest {
             }
         });
     }
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        this.conf.setBrokerDeduplicationEnabled(true);
+        super.baseSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testMessageDeduplication() throws Exception {
+        String topicName = "persistent://prop/ns-abc/testMessageDeduplication";
+        String producerName = "test-producer";
+        Producer<String> producer = pulsarClient
+                .newProducer(Schema.STRING)
+                .producerName(producerName)
+                .topic(topicName)
+                .create();
+        final PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService()
+                .getTopicIfExists(topicName).get().orElse(null);
+        assertNotNull(persistentTopic);
+        final MessageDeduplication messageDeduplication = 
persistentTopic.getMessageDeduplication();
+        
assertFalse(messageDeduplication.getInactiveProducers().containsKey(producerName));
+        producer.close();
+        Awaitility.await().untilAsserted(() -> 
assertTrue(messageDeduplication.getInactiveProducers().containsKey(producerName)));
+        admin.topicPolicies().setDeduplicationStatus(topicName, false);
+        Awaitility.await().untilAsserted(() -> {
+                    final Boolean deduplicationStatus = 
admin.topicPolicies().getDeduplicationStatus(topicName);
+                    Assert.assertNotNull(deduplicationStatus);
+                    Assert.assertFalse(deduplicationStatus);
+                });
+        messageDeduplication.purgeInactiveProducers();
+        assertTrue(messageDeduplication.getInactiveProducers().isEmpty());
+    }
 }

Reply via email to