This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 3b02d90d745 [Broker] Fix messageDedup delete inactive producer name 
(#12493)
3b02d90d745 is described below

commit 3b02d90d74594e67db852c18e752d3514168dc52
Author: congbo <[email protected]>
AuthorDate: Wed Oct 27 10:01:39 2021 +0800

    [Broker] Fix messageDedup delete inactive producer name (#12493)
    
    Now, remove inactive producerName in MessageDeduplication when producer 
close. But the producer has been closed before topic unload, this producerName 
will not be remove if producer don't connect broker with the same producerName.
    
    When topic recover `MessageDeduplication`, we should put every producerName 
into inactive producerNameMap. When producer with the same name, we will remove 
it from the inactive map, if this producerName can not connect within 
brokerDeduplicationProducerInactivityTimeoutMinutes, we can remove it.
    
    (cherry picked from commit 928924b5a37dbab8eea791200d328739d72da016)
---
 .../service/persistent/MessageDeduplication.java   |  2 +
 .../service/persistent/MessageDuplicationTest.java | 91 ++++++++++++++++++----
 2 files changed, 78 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index e40b70c437a..12c6bccfecf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -143,6 +143,7 @@ public class MessageDeduplication {
     private CompletableFuture<Void> recoverSequenceIdsMap() {
         // Load the sequence ids from the snapshot in the cursor properties
         managedCursor.getProperties().forEach((k, v) -> {
+            inactiveProducers.put(k, System.currentTimeMillis());
             highestSequencedPushed.put(k, v);
             highestSequencedPersisted.put(k, v);
         });
@@ -174,6 +175,7 @@ public class MessageDeduplication {
                     long sequenceId = Math.max(md.getHighestSequenceId(), 
md.getSequenceId());
                     highestSequencedPushed.put(producerName, sequenceId);
                     highestSequencedPersisted.put(producerName, sequenceId);
+                    inactiveProducers.put(producerName, 
System.currentTimeMillis());
 
                     md.recycle();
                     entry.release();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 5cfdef839db..e172ac27115 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -18,7 +18,24 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static 
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -29,25 +46,11 @@ import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.testng.annotations.Test;
 
-import java.util.concurrent.ScheduledExecutorService;
-
-import static 
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
 @Slf4j
 public class MessageDuplicationTest {
 
@@ -146,6 +149,64 @@ public class MessageDuplicationTest {
         assertEquals(lastSequenceIdPushed.longValue(), 5);
     }
 
+    @Test
+    public void testInactiveProducerRemove() throws Exception {
+        PulsarService pulsarService = mock(PulsarService.class);
+        PersistentTopic topic = mock(PersistentTopic.class);
+        ManagedLedger managedLedger = mock(ManagedLedger.class);
+
+        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
+        
serviceConfiguration.setBrokerDeduplicationEntriesInterval(BROKER_DEDUPLICATION_ENTRIES_INTERVAL);
+        
serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS);
+        serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX);
+        
serviceConfiguration.setBrokerDeduplicationProducerInactivityTimeoutMinutes(1);
+
+        doReturn(serviceConfiguration).when(pulsarService).getConfiguration();
+        MessageDeduplication messageDeduplication = spy(new 
MessageDeduplication(pulsarService, topic, managedLedger));
+        doReturn(true).when(messageDeduplication).isEnabled();
+
+        Topic.PublishContext publishContext = mock(Topic.PublishContext.class);
+
+        Field field = 
MessageDeduplication.class.getDeclaredField("inactiveProducers");
+        field.setAccessible(true);
+        Map<String, Long> map = (Map<String, Long>) 
field.get(messageDeduplication);
+
+        String producerName1 = "test1";
+        when(publishContext.getHighestSequenceId()).thenReturn(2L);
+        when(publishContext.getSequenceId()).thenReturn(1L);
+        when(publishContext.getProducerName()).thenReturn(producerName1);
+        messageDeduplication.isDuplicate(publishContext, null);
+
+        String producerName2 = "test2";
+        when(publishContext.getProducerName()).thenReturn(producerName2);
+        messageDeduplication.isDuplicate(publishContext, null);
+
+        String producerName3 = "test3";
+        when(publishContext.getProducerName()).thenReturn(producerName3);
+        messageDeduplication.isDuplicate(publishContext, null);
+
+        messageDeduplication.producerRemoved(producerName1);
+        assertTrue(map.containsKey(producerName1));
+        messageDeduplication.producerAdded(producerName1);
+        assertFalse(map.containsKey(producerName1));
+        messageDeduplication.purgeInactiveProducers();
+        // messageDeduplication.purgeInactiveProducers() will remove producer2 
and producer3
+        map.put(producerName2, System.currentTimeMillis() - 70000);
+        map.put(producerName3, System.currentTimeMillis() - 70000);
+        messageDeduplication.purgeInactiveProducers();
+        assertFalse(map.containsKey(producerName2));
+        assertFalse(map.containsKey(producerName3));
+
+        field = 
MessageDeduplication.class.getDeclaredField("highestSequencedPushed");
+        field.setAccessible(true);
+        ConcurrentOpenHashMap<String, Long> highestSequencedPushed = 
(ConcurrentOpenHashMap<String, Long>) field.get(messageDeduplication);
+
+        assertEquals((long) highestSequencedPushed.get(producerName1), 2L);
+        assertFalse(highestSequencedPushed.containsKey(producerName2));
+        assertFalse(highestSequencedPushed.containsKey(producerName3));
+
+    }
+
     @Test
     public void testIsDuplicateWithFailure() {
 

Reply via email to