This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5d369762fb8776401039405ebba9057fe2c66b0e
Author: JiangHaiting <[email protected]>
AuthorDate: Mon Nov 15 08:52:31 2021 +0800

    [Issue 12723] Fix race condition in PersistentTopic#addReplicationCluster 
(#12729)
    
    See #12723
    
    Add a method 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap#removeNullValue 
to remove null value   in a thread safe way.
    
    (cherry picked from commit a3fe00efc4ccba55a0f28fd02b535c6624e3ed0a)
---
 .../broker/service/persistent/PersistentTopic.java |  6 ++--
 .../util/collections/ConcurrentOpenHashMap.java    | 26 ++++++++++++++++++
 .../collections/ConcurrentOpenHashMapTest.java     | 32 ++++++++++++++++++++++
 3 files changed, 61 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c232071..275c7ea 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1552,7 +1552,7 @@ public class PersistentTopic extends AbstractTopic
                         .thenApply(clusterData ->
                                 
brokerService.getReplicationClient(remoteCluster, clusterData)))
                 .thenAccept(replicationClient -> {
-                    replicators.computeIfAbsent(remoteCluster, r -> {
+                    Replicator replicator = 
replicators.computeIfAbsent(remoteCluster, r -> {
                         try {
                             return new 
PersistentReplicator(PersistentTopic.this, cursor, localCluster,
                                     remoteCluster, brokerService, 
(PulsarClientImpl) replicationClient);
@@ -1563,8 +1563,8 @@ public class PersistentTopic extends AbstractTopic
                     });
 
                     // clean up replicator if startup is failed
-                    if (replicators.containsKey(remoteCluster) && 
replicators.get(remoteCluster) == null) {
-                        replicators.remove(remoteCluster);
+                    if (replicator == null) {
+                        replicators.removeNullValue(remoteCluster);
                     }
                 });
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
index 47927a9..2c7eed1 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.common.util.collections;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -42,6 +43,27 @@ public class ConcurrentOpenHashMap<K, V> {
     private static final Object EmptyKey = null;
     private static final Object DeletedKey = new Object();
 
+    /**
+     * This object is used to delete empty value in this map.
+     * EmptyValue.equals(null) = true.
+     */
+    private static final Object EmptyValue = new Object() {
+
+        @SuppressFBWarnings
+        @Override
+        public boolean equals(Object obj) {
+            return obj == null;
+        }
+
+        /**
+         * This is just for avoiding spotbugs errors
+         */
+        @Override
+        public int hashCode() {
+            return super.hashCode();
+        }
+    };
+
     private static final float MapFillFactor = 0.66f;
 
     private static final int DefaultExpectedItems = 256;
@@ -142,6 +164,10 @@ public class ConcurrentOpenHashMap<K, V> {
         return getSection(h).remove(key, value, (int) h) != null;
     }
 
+    public void removeNullValue(K key) {
+        remove(key, EmptyValue);
+    }
+
     private Section<K, V> getSection(long hash) {
         // Use 32 msb out of long to get the section
         final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
index e18012c..254be51 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -369,6 +370,37 @@ public class ConcurrentOpenHashMapTest {
         assertNull(map.get(t1_b));
     }
 
+    @Test
+    public void testNullValue() {
+        ConcurrentOpenHashMap<String, String> map = new 
ConcurrentOpenHashMap<>(16, 1);
+        String key = "a";
+        assertThrows(NullPointerException.class, () -> map.put(key, null));
+
+        //put a null value.
+        assertNull(map.computeIfAbsent(key, k -> null));
+        assertEquals(1, map.size());
+        assertEquals(1, map.keys().size());
+        assertEquals(1, map.values().size());
+        assertNull(map.get(key));
+        assertFalse(map.containsKey(key));
+
+        //test remove null value
+        map.removeNullValue(key);
+        assertTrue(map.isEmpty());
+        assertEquals(0, map.keys().size());
+        assertEquals(0, map.values().size());
+        assertNull(map.get(key));
+        assertFalse(map.containsKey(key));
+
+
+        //test not remove non-null value
+        map.put(key, "V");
+        assertEquals(1, map.size());
+        map.removeNullValue(key);
+        assertEquals(1, map.size());
+
+    }
+
     static final int Iterations = 1;
     static final int ReadIterations = 1000;
     static final int N = 1_000_000;

Reply via email to