This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 3b02d90d745 [Broker] Fix messageDedup delete inactive producer name
(#12493)
3b02d90d745 is described below
commit 3b02d90d74594e67db852c18e752d3514168dc52
Author: congbo <[email protected]>
AuthorDate: Wed Oct 27 10:01:39 2021 +0800
[Broker] Fix messageDedup delete inactive producer name (#12493)
Now, remove inactive producerName in MessageDeduplication when producer
close. But the producer has been closed before topic unload, this producerName
will not be remove if producer don't connect broker with the same producerName.
When topic recover `MessageDeduplication`, we should put every producerName
into inactive producerNameMap. When producer with the same name, we will remove
it from the inactive map, if this producerName can not connect within
brokerDeduplicationProducerInactivityTimeoutMinutes, we can remove it.
(cherry picked from commit 928924b5a37dbab8eea791200d328739d72da016)
---
.../service/persistent/MessageDeduplication.java | 2 +
.../service/persistent/MessageDuplicationTest.java | 91 ++++++++++++++++++----
2 files changed, 78 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index e40b70c437a..12c6bccfecf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -143,6 +143,7 @@ public class MessageDeduplication {
private CompletableFuture<Void> recoverSequenceIdsMap() {
// Load the sequence ids from the snapshot in the cursor properties
managedCursor.getProperties().forEach((k, v) -> {
+ inactiveProducers.put(k, System.currentTimeMillis());
highestSequencedPushed.put(k, v);
highestSequencedPersisted.put(k, v);
});
@@ -174,6 +175,7 @@ public class MessageDeduplication {
long sequenceId = Math.max(md.getHighestSequenceId(),
md.getSequenceId());
highestSequencedPushed.put(producerName, sequenceId);
highestSequencedPersisted.put(producerName, sequenceId);
+ inactiveProducers.put(producerName,
System.currentTimeMillis());
md.recycle();
entry.release();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 5cfdef839db..e172ac27115 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -18,7 +18,24 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import static
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -29,25 +46,11 @@ import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.Test;
-import java.util.concurrent.ScheduledExecutorService;
-
-import static
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
@Slf4j
public class MessageDuplicationTest {
@@ -146,6 +149,64 @@ public class MessageDuplicationTest {
assertEquals(lastSequenceIdPushed.longValue(), 5);
}
+ @Test
+ public void testInactiveProducerRemove() throws Exception {
+ PulsarService pulsarService = mock(PulsarService.class);
+ PersistentTopic topic = mock(PersistentTopic.class);
+ ManagedLedger managedLedger = mock(ManagedLedger.class);
+
+ ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
+
serviceConfiguration.setBrokerDeduplicationEntriesInterval(BROKER_DEDUPLICATION_ENTRIES_INTERVAL);
+
serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS);
+ serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX);
+
serviceConfiguration.setBrokerDeduplicationProducerInactivityTimeoutMinutes(1);
+
+ doReturn(serviceConfiguration).when(pulsarService).getConfiguration();
+ MessageDeduplication messageDeduplication = spy(new
MessageDeduplication(pulsarService, topic, managedLedger));
+ doReturn(true).when(messageDeduplication).isEnabled();
+
+ Topic.PublishContext publishContext = mock(Topic.PublishContext.class);
+
+ Field field =
MessageDeduplication.class.getDeclaredField("inactiveProducers");
+ field.setAccessible(true);
+ Map<String, Long> map = (Map<String, Long>)
field.get(messageDeduplication);
+
+ String producerName1 = "test1";
+ when(publishContext.getHighestSequenceId()).thenReturn(2L);
+ when(publishContext.getSequenceId()).thenReturn(1L);
+ when(publishContext.getProducerName()).thenReturn(producerName1);
+ messageDeduplication.isDuplicate(publishContext, null);
+
+ String producerName2 = "test2";
+ when(publishContext.getProducerName()).thenReturn(producerName2);
+ messageDeduplication.isDuplicate(publishContext, null);
+
+ String producerName3 = "test3";
+ when(publishContext.getProducerName()).thenReturn(producerName3);
+ messageDeduplication.isDuplicate(publishContext, null);
+
+ messageDeduplication.producerRemoved(producerName1);
+ assertTrue(map.containsKey(producerName1));
+ messageDeduplication.producerAdded(producerName1);
+ assertFalse(map.containsKey(producerName1));
+ messageDeduplication.purgeInactiveProducers();
+ // messageDeduplication.purgeInactiveProducers() will remove producer2
and producer3
+ map.put(producerName2, System.currentTimeMillis() - 70000);
+ map.put(producerName3, System.currentTimeMillis() - 70000);
+ messageDeduplication.purgeInactiveProducers();
+ assertFalse(map.containsKey(producerName2));
+ assertFalse(map.containsKey(producerName3));
+
+ field =
MessageDeduplication.class.getDeclaredField("highestSequencedPushed");
+ field.setAccessible(true);
+ ConcurrentOpenHashMap<String, Long> highestSequencedPushed =
(ConcurrentOpenHashMap<String, Long>) field.get(messageDeduplication);
+
+ assertEquals((long) highestSequencedPushed.get(producerName1), 2L);
+ assertFalse(highestSequencedPushed.containsKey(producerName2));
+ assertFalse(highestSequencedPushed.containsKey(producerName3));
+
+ }
+
@Test
public void testIsDuplicateWithFailure() {