This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 1280225a402 [fix][broker] Fix the publish latency spike from the 
contention of MessageDeduplication (#20647)
1280225a402 is described below

commit 1280225a402075bfd27c94dc316cc05c2ae54eb8
Author: Penghui Li <[email protected]>
AuthorDate: Thu Jun 29 10:54:31 2023 +0800

    [fix][broker] Fix the publish latency spike from the contention of 
MessageDeduplication (#20647)
    
    (cherry picked from commit 31c5b4dacce52f9f3698050060bafb5192fb6787)
---
 .../pulsar/broker/service/persistent/MessageDeduplication.java | 10 +++++-----
 .../broker/service/persistent/MessageDuplicationTest.java      |  3 ++-
 2 files changed, 7 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 030d74a014f..ed4e70bfd29 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -20,12 +20,12 @@ package org.apache.pulsar.broker.service.persistent;
 
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
@@ -126,7 +126,7 @@ public class MessageDeduplication {
     private final int maxNumberOfProducers;
 
     // Map used to track the inactive producer along with the timestamp of 
their last activity
-    private final Map<String, Long> inactiveProducers = new HashMap<>();
+    private final Map<String, Long> inactiveProducers = new 
ConcurrentHashMap<>();
 
     private final String replicatorPrefix;
 
@@ -436,7 +436,7 @@ public class MessageDeduplication {
     /**
      * Topic will call this method whenever a producer connects.
      */
-    public synchronized void producerAdded(String producerName) {
+    public void producerAdded(String producerName) {
         // Producer is no-longer inactive
         inactiveProducers.remove(producerName);
     }
@@ -444,7 +444,7 @@ public class MessageDeduplication {
     /**
      * Topic will call this method whenever a producer disconnects.
      */
-    public synchronized void producerRemoved(String producerName) {
+    public void producerRemoved(String producerName) {
         // Producer is no-longer active
         inactiveProducers.put(producerName, System.currentTimeMillis());
     }
@@ -456,7 +456,7 @@ public class MessageDeduplication {
         long minimumActiveTimestamp = System.currentTimeMillis() - 
TimeUnit.MINUTES
                 
.toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
 
-        Iterator<java.util.Map.Entry<String, Long>> mapIterator = 
inactiveProducers.entrySet().iterator();
+        Iterator<Map.Entry<String, Long>> mapIterator = 
inactiveProducers.entrySet().iterator();
         boolean hasInactive = false;
         while (mapIterator.hasNext()) {
             java.util.Map.Entry<String, Long> entry = mapIterator.next();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index b4152b143ab..19583a4455e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -36,6 +36,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 import java.lang.reflect.Field;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -174,7 +175,7 @@ public class MessageDuplicationTest {
 
         Field field = 
MessageDeduplication.class.getDeclaredField("inactiveProducers");
         field.setAccessible(true);
-        Map<String, Long> inactiveProducers = (Map<String, Long>) 
field.get(messageDeduplication);
+        Map<String, Long> inactiveProducers = (ConcurrentHashMap<String, 
Long>) field.get(messageDeduplication);
 
         String producerName1 = "test1";
         when(publishContext.getHighestSequenceId()).thenReturn(2L);

Reply via email to