This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 928924b  [Broker] Fix messageDedup delete inactive producer name 
(#12493)
928924b is described below

commit 928924b5a37dbab8eea791200d328739d72da016
Author: congbo <[email protected]>
AuthorDate: Wed Oct 27 10:01:39 2021 +0800

    [Broker] Fix messageDedup delete inactive producer name (#12493)
    
    ## Issue
    Now, remove inactive producerName in MessageDeduplication when producer 
close. But the producer has been closed before topic unload, this producerName 
will not be remove if producer don't connect broker with the same producerName.
    
    ## implement
    
    When topic recover `MessageDeduplication`, we should put every producerName 
into inactive producerNameMap. When producer with the same name, we will remove 
it from the inactive map, if this producerName can not connect within 
brokerDeduplicationProducerInactivityTimeoutMinutes, we can remove it.
---
 .../service/persistent/MessageDeduplication.java   |  2 +
 .../service/persistent/MessageDuplicationTest.java | 67 ++++++++++++++++++++++
 2 files changed, 69 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 0df50cb..b2c42b0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -138,6 +138,7 @@ public class MessageDeduplication {
     private CompletableFuture<Void> recoverSequenceIdsMap() {
         // Load the sequence ids from the snapshot in the cursor properties
         managedCursor.getProperties().forEach((k, v) -> {
+            inactiveProducers.put(k, System.currentTimeMillis());
             highestSequencedPushed.put(k, v);
             highestSequencedPersisted.put(k, v);
         });
@@ -168,6 +169,7 @@ public class MessageDeduplication {
                     long sequenceId = Math.max(md.getHighestSequenceId(), 
md.getSequenceId());
                     highestSequencedPushed.put(producerName, sequenceId);
                     highestSequencedPersisted.put(producerName, sequenceId);
+                    inactiveProducers.put(producerName, 
System.currentTimeMillis());
 
                     entry.release();
                 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index bfef2b7..4dc7f7f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -30,7 +30,13 @@ import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+
 import static 
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
@@ -40,8 +46,11 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 @Slf4j
 @Test(groups = "broker")
@@ -143,6 +152,64 @@ public class MessageDuplicationTest {
     }
 
     @Test
+    public void testInactiveProducerRemove() throws Exception {
+        PulsarService pulsarService = mock(PulsarService.class);
+        PersistentTopic topic = mock(PersistentTopic.class);
+        ManagedLedger managedLedger = mock(ManagedLedger.class);
+
+        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
+        
serviceConfiguration.setBrokerDeduplicationEntriesInterval(BROKER_DEDUPLICATION_ENTRIES_INTERVAL);
+        
serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS);
+        serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX);
+        
serviceConfiguration.setBrokerDeduplicationProducerInactivityTimeoutMinutes(1);
+
+        doReturn(serviceConfiguration).when(pulsarService).getConfiguration();
+        MessageDeduplication messageDeduplication = spy(new 
MessageDeduplication(pulsarService, topic, managedLedger));
+        doReturn(true).when(messageDeduplication).isEnabled();
+
+        Topic.PublishContext publishContext = mock(Topic.PublishContext.class);
+
+        Field field = 
MessageDeduplication.class.getDeclaredField("inactiveProducers");
+        field.setAccessible(true);
+        Map<String, Long> map = (Map<String, Long>) 
field.get(messageDeduplication);
+
+        String producerName1 = "test1";
+        when(publishContext.getHighestSequenceId()).thenReturn(2L);
+        when(publishContext.getSequenceId()).thenReturn(1L);
+        when(publishContext.getProducerName()).thenReturn(producerName1);
+        messageDeduplication.isDuplicate(publishContext, null);
+
+        String producerName2 = "test2";
+        when(publishContext.getProducerName()).thenReturn(producerName2);
+        messageDeduplication.isDuplicate(publishContext, null);
+
+        String producerName3 = "test3";
+        when(publishContext.getProducerName()).thenReturn(producerName3);
+        messageDeduplication.isDuplicate(publishContext, null);
+
+        messageDeduplication.producerRemoved(producerName1);
+        assertTrue(map.containsKey(producerName1));
+        messageDeduplication.producerAdded(producerName1);
+        assertFalse(map.containsKey(producerName1));
+        messageDeduplication.purgeInactiveProducers();
+        // messageDeduplication.purgeInactiveProducers() will remove producer2 
and producer3
+        map.put(producerName2, System.currentTimeMillis() - 70000);
+        map.put(producerName3, System.currentTimeMillis() - 70000);
+        messageDeduplication.purgeInactiveProducers();
+        assertFalse(map.containsKey(producerName2));
+        assertFalse(map.containsKey(producerName3));
+
+        field = 
MessageDeduplication.class.getDeclaredField("highestSequencedPushed");
+        field.setAccessible(true);
+        ConcurrentOpenHashMap<String, Long> highestSequencedPushed = 
(ConcurrentOpenHashMap<String, Long>) field.get(messageDeduplication);
+
+        assertEquals((long) highestSequencedPushed.get(producerName1), 2L);
+        assertFalse(highestSequencedPushed.containsKey(producerName2));
+        assertFalse(highestSequencedPushed.containsKey(producerName3));
+
+    }
+
+    @Test
     public void testIsDuplicateWithFailure() {
 
         PulsarService pulsarService = mock(PulsarService.class);

Reply via email to