sijie closed pull request #2387: Fixed race condition during expansion of 
concurrent open hash maps
URL: https://github.com/apache/incubator-pulsar/pull/2387
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
index fcb0c10dce..60c24c08ca 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
@@ -30,7 +30,7 @@
 
 /**
  * Map from long to an Object.
- * 
+ *
  * Provides similar methods as a ConcurrentMap<long,Object> with 2 differences:
  * <ol>
  * <li>No boxing/unboxing from long -> Long
@@ -187,10 +187,10 @@ public void forEach(EntryProcessor<V> processor) {
     // A section is a portion of the hash map that is covered by a single
     @SuppressWarnings("serial")
     private static final class Section<V> extends StampedLock {
-        private long[] keys;
-        private V[] values;
+        private volatile long[] keys;
+        private volatile V[] values;
 
-        private int capacity;
+        private volatile int capacity;
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
index 74d4314b19..4634b40f38 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
@@ -26,7 +26,6 @@
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.StampedLock;
-import java.util.function.Predicate;
 
 /**
  * Concurrent hash set where values are composed of pairs of longs.
@@ -163,11 +162,11 @@ public void forEach(LongPairConsumer processor) {
 
     /**
      * Removes all of the elements of this collection that satisfy the given 
predicate.
-     * 
+     *
      * @param filter
      *            a predicate which returns {@code true} for elements to be 
removed
      * @return {@code true} if any elements were removed
-     * 
+     *
      * @return number of removed values
      */
     public int removeIf(LongPairPredicate filter) {
@@ -209,9 +208,9 @@ public int removeIf(LongPairPredicate filter) {
     @SuppressWarnings("serial")
     private static final class Section extends StampedLock {
         // Keys and values are stored interleaved in the table array
-        private long[] table;
+        private volatile long[] table;
 
-        private int capacity;
+        private volatile int capacity;
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
@@ -449,9 +448,11 @@ private void rehash() {
                 }
             }
 
-            capacity = newCapacity;
             table = newTable;
             usedBuckets = size;
+            // Capacity needs to be updated after the values, so that we won't 
see
+            // a capacity value bigger than the actual array size
+            capacity = newCapacity;
             resizeThreshold = (int) (capacity * SetFillFactor);
         }
 
@@ -532,7 +533,7 @@ public int compareTo(LongPair o) {
             }
         }
     }
-    
+
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
index 585471c3ff..94f64de33e 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
@@ -31,7 +31,7 @@
 
 /**
  * Concurrent hash map
- * 
+ *
  * Provides similar methods as a ConcurrentMap<K,V> but since it's an open 
hash map with linear probing, no node
  * allocations are required to store the values
  *
@@ -180,9 +180,9 @@ public void forEach(BiConsumer<? super K, ? super V> 
processor) {
     @SuppressWarnings("serial")
     private static final class Section<K, V> extends StampedLock {
         // Keys and values are stored interleaved in the table array
-        private Object[] table;
+        private volatile Object[] table;
 
-        private int capacity;
+        private volatile int capacity;
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
index 93ca6e8ac5..2f27913fe3 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
@@ -32,7 +32,7 @@
 
 /**
  * Concurrent hash set
- * 
+ *
  * Provides similar methods as a ConcurrentMap<K,V> but since it's an open 
hash map with linear probing, no node
  * allocations are required to store the values
  *
@@ -175,9 +175,9 @@ public String toString() {
     // A section is a portion of the hash map that is covered by a single
     @SuppressWarnings("serial")
     private static final class Section<V> extends StampedLock {
-        private V[] values;
+        private volatile V[] values;
 
-        private int capacity;
+        private volatile int capacity;
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
index c4215f3408..e4cbd4781c 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
@@ -24,12 +24,15 @@
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Lists;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -37,11 +40,8 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.LongFunction;
 
-import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-
 public class ConcurrentLongHashMapTest {
 
     @Test
@@ -234,6 +234,57 @@ public void concurrentInsertionsAndReads() throws 
Throwable {
         executor.shutdown();
     }
 
+    @Test
+    public void stressConcurrentInsertionsAndReads() throws Throwable {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(4, 1);
+        ExecutorService executor = Executors.newCachedThreadPool();
+        final int writeThreads = 16;
+        final int readThreads = 16;
+        final int n = 1_000_000;
+        String value = "value";
+        CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < writeThreads; i++) {
+            final int threadIdx = i;
+            futures.add(executor.submit(() -> {
+                Random random = new Random(threadIdx);
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                for (int j = 0; j < n; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+                    map.put(key, value);
+                }
+            }));
+        }
+        for (int i = 0; i < readThreads; i++) {
+            final int threadIdx = i;
+            futures.add(executor.submit(() -> {
+                Random random = new Random(threadIdx);
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                for (int j = 0; j < n; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+                    map.get(key);
+                }
+            }));
+        }
+        for (Future<?> future : futures) {
+            future.get();
+        }
+        assertEquals(map.size(), n * writeThreads);
+        executor.shutdown();
+    }
+
     @Test
     public void testIteration() {
         ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to