This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new c229b46fe5d [fix][broker] ConcurrentLongHashMap throw 
ArrayIndexOutOfBoundsException (#25644)
c229b46fe5d is described below

commit c229b46fe5db6f42e2aff4faeb57b9b00e15bbfe
Author: void-ptr974 <[email protected]>
AuthorDate: Sat May 2 22:38:33 2026 +0800

    [fix][broker] ConcurrentLongHashMap throw ArrayIndexOutOfBoundsException 
(#25644)
---
 .../ConcurrentLongHashMapBenchmark.java            | 411 +++++++++++++++++++++
 .../common/util/collections/package-info.java      |  20 +
 .../util/collections/ConcurrentLongHashMap.java    |  99 ++---
 .../collections/ConcurrentLongHashMapTest.java     | 389 +++++++++++++++++--
 4 files changed, 842 insertions(+), 77 deletions(-)

diff --git 
a/microbench/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapBenchmark.java
 
b/microbench/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapBenchmark.java
new file mode 100644
index 00000000000..e11f12f914c
--- /dev/null
+++ 
b/microbench/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapBenchmark.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.util.collections;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Group;
+import org.openjdk.jmh.annotations.GroupThreads;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * Benchmarks for {@link ConcurrentLongHashMap}.
+ *
+ * <p>Compares two implementations:
+ * <ul>
+ *   <li>{@code clhm} – {@link ConcurrentLongHashMap} (immutable Table 
snapshot, primitive long
+ *       keys, zero allocation on the key path),</li>
+ *   <li>{@code chm} – {@link java.util.concurrent.ConcurrentHashMap} as the 
JDK baseline.</li>
+ * </ul>
+ *
+ * <p>Workload mix:
+ * <ul>
+ *   <li>{@link #getHit}/{@link #getMiss}/{@link #putRemove} – single-thread 
basics.</li>
+ *   <li>{@link #concurrentGetHit} – read-only, 16 threads.</li>
+ *   <li>{@link #concurrentMixedReader}/{@link #concurrentMixedWriter} – an 
asymmetric concurrent
+ *       group: 12 readers + 4 writers operating on disjoint key partitions, 
so any concurrency
+ *       bug (torn rehash, lost-update, partial-publish) shows up either as a 
JMH error or a
+ *       reduced ops/sec number on the suspect implementation.</li>
+ *   <li>{@link #concurrentExpandShrinkWriter}/{@link 
#concurrentExpandShrinkReader} – starts the
+ *       map at the smallest legal capacity and hammers a single section with 
put/remove so that
+ *       every writer constantly forces a rehash. Highest-pressure 
rehash-vs-read benchmark; this
+ *       is the workload that originally surfaced the OOB race in the 
pre-Table design.</li>
+ *   <li>{@link #boxingGetHit}/{@link #boxingPutGetRemove}/{@link 
#boxingConcurrentGetHit}/
+ *       {@link #boxingConcurrentPutRemove} – use keys above the {@code 
Long.valueOf} cache range
+ *       so every CHM operation has to allocate a fresh boxed Long. Run with 
{@code -prof gc} to
+ *       see the alloc-rate divergence vs the primitive-long map.</li>
+ * </ul>
+ *
+ * <p>Run from the repo root:
+ * <pre>{@code
+ * ./gradlew :microbench:shadowJar
+ * java -jar microbench/build/libs/microbench-*-benchmarks.jar \
+ *      "ConcurrentLongHashMapBenchmark.concurrentExpandShrink" -prof gc
+ * }</pre>
+ */
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@BenchmarkMode(Mode.AverageTime)
+@Fork(1)
+@Warmup(iterations = 2, time = 5, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
+public class ConcurrentLongHashMapBenchmark {
+
+    /**
+     * Shared benchmark state for the steady-state benchmarks (the map is 
fully populated up
+     * front and the workload only mutates keys outside the resident set).
+     */
+    @State(Scope.Benchmark)
+    public static class MapState {
+        @Param({"clhm", "chm"})
+        private String implementation;
+
+        @Param({"1024", "65536"})
+        private int entries;
+
+        private long[] presentKeys;
+        private long[] absentKeys;
+        private ConcurrentLongHashMap<String> clhm;
+        private ConcurrentHashMap<Long, String> chm;
+        private AtomicLong writeKey;
+
+        @Setup(Level.Trial)
+        public void setup() {
+            presentKeys = new long[entries];
+            absentKeys = new long[entries];
+            clhm = ConcurrentLongHashMap.<String>newBuilder()
+                    .expectedItems(entries)
+                    .concurrencyLevel(16)
+                    .build();
+            chm = new ConcurrentHashMap<>(entries, 0.66f, 16);
+
+            for (int i = 0; i < entries; i++) {
+                long key = i;
+                presentKeys[i] = key;
+                absentKeys[i] = key ^ Long.MIN_VALUE;
+                clhm.put(key, "value");
+                chm.put(key, "value");
+            }
+
+            writeKey = new AtomicLong(1L << 48);
+        }
+
+        String get(long key) {
+            return "clhm".equals(implementation) ? clhm.get(key) : 
chm.get(key);
+        }
+
+        void put(long key, String value) {
+            if ("clhm".equals(implementation)) {
+                clhm.put(key, value);
+            } else {
+                chm.put(key, value);
+            }
+        }
+
+        void remove(long key) {
+            if ("clhm".equals(implementation)) {
+                clhm.remove(key);
+            } else {
+                chm.remove(key);
+            }
+        }
+
+        long nextWriteKey() {
+            return writeKey.getAndIncrement();
+        }
+    }
+
+    /**
+     * Per-thread cursor state.
+     */
+    @State(Scope.Thread)
+    public static class CursorState {
+        private int index;
+
+        int next(int length) {
+            int value = index;
+            index = value + 1;
+            return value & (length - 1);
+        }
+    }
+
+    /**
+     * Independent key-stream state for concurrent benchmarks. Each writer 
thread owns a unique
+     * partition of the long key-space so the mutations don't trample each 
other and the steady
+     * state remains bounded; readers also walk a private cursor so they don't 
hot-spot a single
+     * bucket.
+     */
+    @State(Scope.Thread)
+    public static class WriterState {
+        private static final AtomicLong NEXT_PARTITION = new AtomicLong();
+        private long base;
+        private long offset;
+        // Keep a small per-writer working set so put + remove pair cleanly 
without unbounded growth.
+        private static final int WORKING_SET = 1024;
+
+        @Setup(Level.Iteration)
+        public void setup() {
+            base = NEXT_PARTITION.getAndIncrement() << 40;
+            offset = 0;
+        }
+
+        long nextKey() {
+            long k = base + (offset & (WORKING_SET - 1));
+            offset++;
+            return k;
+        }
+    }
+
+    @Benchmark
+    public void getHit(MapState map, CursorState cursor, Blackhole blackhole) {
+        
blackhole.consume(map.get(map.presentKeys[cursor.next(map.presentKeys.length)]));
+    }
+
+    @Benchmark
+    public void getMiss(MapState map, CursorState cursor, Blackhole blackhole) 
{
+        
blackhole.consume(map.get(map.absentKeys[cursor.next(map.absentKeys.length)]));
+    }
+
+    @Benchmark
+    public void putRemove(MapState map, Blackhole blackhole) {
+        long key = map.nextWriteKey();
+        map.put(key, "value");
+        blackhole.consume(map.get(key));
+        map.remove(key);
+    }
+
+    @Benchmark
+    @Threads(16)
+    public void concurrentGetHit(MapState map, CursorState cursor, Blackhole 
blackhole) {
+        
blackhole.consume(map.get(map.presentKeys[cursor.next(map.presentKeys.length)]));
+    }
+
+    /** Reader half of the asymmetric mixed-workload group: 12 reader threads. 
*/
+    @Benchmark
+    @Group("concurrentMixed")
+    @GroupThreads(12)
+    public void concurrentMixedReader(MapState map, CursorState cursor, 
Blackhole blackhole) {
+        
blackhole.consume(map.get(map.presentKeys[cursor.next(map.presentKeys.length)]));
+    }
+
+    /** Writer half of the asymmetric mixed-workload group: 4 writer threads. 
*/
+    @Benchmark
+    @Group("concurrentMixed")
+    @GroupThreads(4)
+    public void concurrentMixedWriter(MapState map, WriterState w, Blackhole 
blackhole) {
+        long key = w.nextKey();
+        map.put(key, "value");
+        blackhole.consume(map.get(key));
+        map.remove(key);
+    }
+
+    /**
+     * Holds an aggressively-shrinking, single-section map. Each writer 
thread's put/remove pair
+     * crosses the expand and shrink thresholds, so the rehash code path is 
exercised on nearly
+     * every operation. Reader threads chase the writers to surface 
read-vs-rehash races. This is
+     * the workload that originally surfaced the OOB race in the pre-Table 
design.
+     */
+    @State(Scope.Benchmark)
+    public static class ChurningMapState {
+        @Param({"clhm", "chm"})
+        private String implementation;
+
+        private ConcurrentLongHashMap<String> clhm;
+        private ConcurrentHashMap<Long, String> chm;
+
+        @Setup(Level.Iteration)
+        public void setup() {
+            clhm = ConcurrentLongHashMap.<String>newBuilder()
+                    .expectedItems(2)
+                    .concurrencyLevel(1)
+                    .autoShrink(true)
+                    .mapIdleFactor(0.25f)
+                    .build();
+            chm = new ConcurrentHashMap<>(4, 0.66f, 1);
+        }
+
+        String get(long key) {
+            return "clhm".equals(implementation) ? clhm.get(key) : 
chm.get(key);
+        }
+
+        String put(long key, String value) {
+            return "clhm".equals(implementation) ? clhm.put(key, value) : 
chm.put(key, value);
+        }
+
+        String remove(long key) {
+            return "clhm".equals(implementation) ? clhm.remove(key) : 
chm.remove(key);
+        }
+    }
+
+    /** Writer driving constant expand+shrink on a single section. */
+    @Benchmark
+    @Group("concurrentExpandShrink")
+    @GroupThreads(4)
+    public void concurrentExpandShrinkWriter(ChurningMapState map, WriterState 
w, Blackhole bh) {
+        long k1 = w.nextKey();
+        long k2 = w.nextKey();
+        bh.consume(map.put(k1, "v"));
+        bh.consume(map.put(k2, "v"));
+        bh.consume(map.remove(k1));
+        bh.consume(map.remove(k2));
+    }
+
+    /** Reader chasing the writers; reads must not throw or return torn 
values. */
+    @Benchmark
+    @Group("concurrentExpandShrink")
+    @GroupThreads(4)
+    public void concurrentExpandShrinkReader(ChurningMapState map, WriterState 
w, Blackhole bh) {
+        bh.consume(map.get(w.nextKey()));
+    }
+
+    // 
------------------------------------------------------------------------------------------
+    // Boxing-impact workload
+    //
+    // Pulsar's actual usage of ConcurrentLongHashMap stores object values 
(CompletableFuture,
+    // Producer, Consumer, ...) and the choice to keep a primitive-long map 
instead of switching
+    // to ConcurrentHashMap<Long, V> hinges on whether the long->Long autobox 
on every operation
+    // materially hurts throughput and GC pressure in practice.
+    //
+    // To make that visible to JMH we have to defeat the JDK's Long.valueOf 
cache (which short-
+    // circuits values in [-128, 127] to a shared instance). Keys here are 
seeded above the cache
+    // range and monotonically increase, so every CHM operation has to 
allocate a fresh boxed
+    // Long; the primitive-long map sees zero allocation on the key path. Run 
with `-prof gc` to
+    // see the alloc-rate divergence on top of the throughput numbers.
+    // 
------------------------------------------------------------------------------------------
+
+    @State(Scope.Benchmark)
+    public static class BoxingMapState {
+        @Param({"clhm", "chm"})
+        private String implementation;
+
+        @Param({"1024", "65536"})
+        private int entries;
+
+        // Lock granularity. Override with `-p concurrency=...` to match CHM's 
bucket-level
+        // striping (default JDK CHM uses one synchronized monitor per bucket, 
so concurrency=1024
+        // on a 1024-bucket map effectively gives the primitive map a 
comparable lock count).
+        @Param({"16"})
+        private int concurrency;
+
+        private long[] presentKeys;
+        private ConcurrentLongHashMap<String> clhm;
+        private ConcurrentHashMap<Long, String> chm;
+
+        @Setup(Level.Trial)
+        public void setup() {
+            presentKeys = new long[entries];
+            int cl = Math.min(concurrency, entries); // builder requires 
expectedItems >= concurrencyLevel
+            clhm = ConcurrentLongHashMap.<String>newBuilder()
+                    .expectedItems(entries).concurrencyLevel(cl).build();
+            chm = new ConcurrentHashMap<>(entries, 0.66f, cl);
+
+            // Start the key space well above 127 so Long.valueOf cannot serve 
from its cache.
+            // Use an odd stride so consecutive keys land in different 
sections / cache lines.
+            final long base = 1L << 32;
+            for (int i = 0; i < entries; i++) {
+                long key = base + ((long) i) * 31L;
+                presentKeys[i] = key;
+                clhm.put(key, "value");
+                chm.put(key, "value");
+            }
+        }
+
+        String get(long key) {
+            return "clhm".equals(implementation) ? clhm.get(key) : 
chm.get(key);
+        }
+
+        String put(long key, String value) {
+            return "clhm".equals(implementation) ? clhm.put(key, value) : 
chm.put(key, value);
+        }
+
+        String remove(long key) {
+            return "clhm".equals(implementation) ? clhm.remove(key) : 
chm.remove(key);
+        }
+    }
+
+    @State(Scope.Thread)
+    public static class BoxingCursor {
+        private int index;
+
+        int next(int length) {
+            int v = index;
+            index = v + 1;
+            return v & (length - 1);
+        }
+    }
+
+    @Benchmark
+    public void boxingGetHit(BoxingMapState map, BoxingCursor cur, Blackhole 
bh) {
+        bh.consume(map.get(map.presentKeys[cur.next(map.presentKeys.length)]));
+    }
+
+    @Benchmark
+    @Threads(16)
+    public void boxingConcurrentGetHit(BoxingMapState map, BoxingCursor cur, 
Blackhole bh) {
+        bh.consume(map.get(map.presentKeys[cur.next(map.presentKeys.length)]));
+    }
+
+    @Benchmark
+    public void boxingPutGetRemove(BoxingMapState map, BoxingCursor cur, 
Blackhole bh) {
+        long key = map.presentKeys[cur.next(map.presentKeys.length)];
+        bh.consume(map.put(key, "value"));
+        bh.consume(map.get(key));
+        bh.consume(map.remove(key));
+        bh.consume(map.put(key, "value"));
+    }
+
+    @State(Scope.Thread)
+    public static class BoxingWriterState {
+        private static final AtomicLong NEXT_PARTITION = new AtomicLong();
+        private long base;
+        private long offset;
+
+        @Setup(Level.Iteration)
+        public void setup() {
+            base = (1L << 40) | (NEXT_PARTITION.getAndIncrement() << 32);
+            offset = 0;
+        }
+
+        long nextKey() {
+            return base + (offset++) * 31L;
+        }
+    }
+
+    @Benchmark
+    @Threads(16)
+    public void boxingConcurrentPutRemove(BoxingMapState map, 
BoxingWriterState w, Blackhole bh) {
+        long key = w.nextKey();
+        bh.consume(map.put(key, "value"));
+        bh.consume(map.remove(key));
+    }
+}
diff --git 
a/microbench/src/main/java/org/apache/pulsar/common/util/collections/package-info.java
 
b/microbench/src/main/java/org/apache/pulsar/common/util/collections/package-info.java
new file mode 100644
index 00000000000..9b2a21422d6
--- /dev/null
+++ 
b/microbench/src/main/java/org/apache/pulsar/common/util/collections/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.util.collections;
\ No newline at end of file
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
index b6408ee9819..780b6454a1c 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.common.util.collections;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 import com.google.common.collect.Lists;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -185,7 +186,7 @@ public class ConcurrentLongHashMap<V> {
     public long capacity() {
         long capacity = 0;
         for (Section<V> s : sections) {
-            capacity += s.capacity;
+            capacity += s.table.capacity();
         }
         return capacity;
     }
@@ -286,13 +287,18 @@ public class ConcurrentLongHashMap<V> {
         void accept(long key, V value);
     }
 
-    // A section is a portion of the hash map that is covered by a single
+    // A section is a portion of the hash map that is covered by a single 
lock. The keys, values
+    // and capacity arrays are bundled into an immutable Table snapshot so 
that readers always see
+    // a consistent (key, value, length) triple, eliminating the 
partial-publish race that the
+    // previous design had to paper over with Math.min(keys.length, 
values.length).
     @SuppressWarnings("serial")
     private static final class Section<V> extends StampedLock {
-        private volatile long[] keys;
-        private volatile V[] values;
+        private record Table<V>(long[] keys, V[] values, int capacity) { }
+
+        // Section is Serializable only by inheritance from StampedLock; never 
actually serialized.
+        @SuppressFBWarnings("SE_BAD_FIELD")
+        private volatile Table<V> table;
 
-        private volatile int capacity;
         private final int initCapacity;
         private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
                 AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
@@ -309,10 +315,9 @@ public class ConcurrentLongHashMap<V> {
 
         Section(int capacity, float mapFillFactor, float mapIdleFactor, 
boolean autoShrink,
                 float expandFactor, float shrinkFactor) {
-            this.capacity = alignToPowerOfTwo(capacity);
-            this.initCapacity = this.capacity;
-            this.keys = new long[this.capacity];
-            this.values = (V[]) new Object[this.capacity];
+            int initial = alignToPowerOfTwo(capacity);
+            this.initCapacity = initial;
+            this.table = new Table<>(new long[initial], (V[]) new 
Object[initial], initial);
             this.size = 0;
             this.usedBuckets = 0;
             this.autoShrink = autoShrink;
@@ -320,19 +325,18 @@ public class ConcurrentLongHashMap<V> {
             this.mapIdleFactor = mapIdleFactor;
             this.expandFactor = expandFactor;
             this.shrinkFactor = shrinkFactor;
-            this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
-            this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
+            this.resizeThresholdUp = (int) (initial * mapFillFactor);
+            this.resizeThresholdBelow = (int) (initial * mapIdleFactor);
         }
 
         V get(long key, int keyHash) {
             long stamp = tryOptimisticRead();
             boolean acquiredLock = false;
 
-            // add local variable here, so OutOfBound won't happen
-            long[] keys = this.keys;
-            V[] values = this.values;
-            // calculate table.length as capacity to avoid rehash changing 
capacity
-            int bucket = signSafeMod(keyHash, values.length);
+            Table<V> table = this.table;
+            long[] keys = table.keys();
+            V[] values = table.values();
+            int bucket = signSafeMod(keyHash, table.capacity());
 
             try {
                 while (true) {
@@ -354,10 +358,10 @@ public class ConcurrentLongHashMap<V> {
                             stamp = readLock();
                             acquiredLock = true;
 
-                            // update local variable
-                            keys = this.keys;
-                            values = this.values;
-                            bucket = signSafeMod(keyHash, values.length);
+                            table = this.table;
+                            keys = table.keys();
+                            values = table.values();
+                            bucket = signSafeMod(keyHash, table.capacity());
                             storedKey = keys[bucket];
                             storedValue = values[bucket];
                         }
@@ -369,7 +373,7 @@ public class ConcurrentLongHashMap<V> {
                             return null;
                         }
                     }
-                    bucket = (bucket + 1) & (values.length - 1);
+                    bucket = (bucket + 1) & (table.capacity() - 1);
                 }
             } finally {
                 if (acquiredLock) {
@@ -382,7 +386,10 @@ public class ConcurrentLongHashMap<V> {
             int bucket = keyHash;
 
             long stamp = writeLock();
-            int capacity = this.capacity;
+            Table<V> table = this.table;
+            long[] keys = table.keys();
+            V[] values = table.values();
+            int capacity = table.capacity();
 
             // Remember where we find the first available spot
             int firstDeletedKey = -1;
@@ -450,10 +457,13 @@ public class ConcurrentLongHashMap<V> {
         private V remove(long key, Object value, int keyHash) {
             int bucket = keyHash;
             long stamp = writeLock();
+            Table<V> table = this.table;
+            long[] keys = table.keys();
+            V[] values = table.values();
+            int capacity = table.capacity();
 
             try {
                 while (true) {
-                    int capacity = this.capacity;
                     bucket = signSafeMod(bucket, capacity);
 
                     long storedKey = keys[bucket];
@@ -521,11 +531,12 @@ public class ConcurrentLongHashMap<V> {
             long stamp = writeLock();
 
             try {
-                if (autoShrink && capacity > initCapacity) {
+                Table<V> table = this.table;
+                if (autoShrink && table.capacity() > initCapacity) {
                     shrinkToInitCapacity();
                 } else {
-                    Arrays.fill(keys, 0);
-                    Arrays.fill(values, EmptyValue);
+                    Arrays.fill(table.keys(), 0);
+                    Arrays.fill(table.values(), EmptyValue);
                     this.size = 0;
                     this.usedBuckets = 0;
                 }
@@ -537,19 +548,20 @@ public class ConcurrentLongHashMap<V> {
         public void forEach(EntryProcessor<V> processor) {
             long stamp = tryOptimisticRead();
 
-            // We need to make sure that we read these 3 variables in a 
consistent way
-            int capacity = this.capacity;
-            long[] keys = this.keys;
-            V[] values = this.values;
+            Table<V> table = this.table;
+            int capacity = table.capacity();
+            long[] keys = table.keys();
+            V[] values = table.values();
 
             // Validate no rehashing
             if (!validate(stamp)) {
                 // Fallback to read lock
                 stamp = readLock();
 
-                capacity = this.capacity;
-                keys = this.keys;
-                values = this.values;
+                table = this.table;
+                capacity = table.capacity();
+                keys = table.keys();
+                values = table.values();
                 unlockRead(stamp);
             }
 
@@ -587,6 +599,9 @@ public class ConcurrentLongHashMap<V> {
             // Expand the hashmap
             long[] newKeys = new long[newCapacity];
             V[] newValues = (V[]) new Object[newCapacity];
+            Table<V> table = this.table;
+            long[] keys = table.keys();
+            V[] values = table.values();
 
             // Re-hash table
             for (int i = 0; i < keys.length; i++) {
@@ -597,27 +612,21 @@ public class ConcurrentLongHashMap<V> {
                 }
             }
 
-            keys = newKeys;
-            values = newValues;
-            capacity = newCapacity;
+            this.table = new Table<>(newKeys, newValues, newCapacity);
             usedBuckets = size;
-            resizeThresholdUp = (int) (capacity * mapFillFactor);
-            resizeThresholdBelow = (int) (capacity * mapIdleFactor);
+            resizeThresholdUp = (int) (newCapacity * mapFillFactor);
+            resizeThresholdBelow = (int) (newCapacity * mapIdleFactor);
         }
 
         private void shrinkToInitCapacity() {
             long[] newKeys = new long[initCapacity];
             V[] newValues = (V[]) new Object[initCapacity];
 
-            keys = newKeys;
-            values = newValues;
+            table = new Table<>(newKeys, newValues, initCapacity);
             size = 0;
             usedBuckets = 0;
-            // Capacity needs to be updated after the values, so that we won't 
see
-            // a capacity value bigger than the actual array size
-            capacity = initCapacity;
-            resizeThresholdUp = (int) (capacity * mapFillFactor);
-            resizeThresholdBelow = (int) (capacity * mapIdleFactor);
+            resizeThresholdUp = (int) (initCapacity * mapFillFactor);
+            resizeThresholdBelow = (int) (initCapacity * mapIdleFactor);
         }
 
         private static <V> void insertKeyValueNoLock(long[] keys, V[] values, 
long key, V value) {
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
index 8ba985ed329..bf9eede4837 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CyclicBarrier;
@@ -35,7 +36,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.LongFunction;
 import lombok.Cleanup;
@@ -211,65 +214,387 @@ public class ConcurrentLongHashMapTest {
         assertTrue(map.capacity() == initCapacity);
     }
 
+    /**
+     * Spins many readers against a section that is constantly expanding and 
shrinking. The
+     * stable key '1' is never removed, so every read must observe "v1"; 
volatile keys 2/3 may or
+     * may not be present at any instant. Any torn read or sentinel leak 
surfaces as an
+     * AssertionError or runtime exception captured in {@code ex}.
+     */
     @Test
-    public void testConcurrentExpandAndShrinkAndGet()  throws Throwable {
+    public void testConcurrentExpandAndShrinkAndGet() throws Throwable {
         ConcurrentLongHashMap<String> map = 
ConcurrentLongHashMap.<String>newBuilder()
                 .expectedItems(2)
                 .concurrencyLevel(1)
                 .autoShrink(true)
                 .mapIdleFactor(0.25f)
                 .build();
-        assertEquals(map.capacity(), 4);
 
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
         final int readThreads = 16;
         final int writeThreads = 1;
         final int n = 1_000;
-        CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
-        Future<?> future = null;
-        AtomicReference<Exception> ex = new AtomicReference<>();
+
+        CyclicBarrier barrier = new CyclicBarrier(readThreads + writeThreads);
+        AtomicReference<Throwable> ex = new AtomicReference<>();
+        List<Future<?>> futures = new ArrayList<>();
+        AtomicBoolean writerDone = new AtomicBoolean(false);
+
+        assertNull(map.put(1, "v1"));
 
         for (int i = 0; i < readThreads; i++) {
-            executor.submit(() -> {
+            futures.add(executor.submit(() -> {
+                barrier.await();
                 try {
-                    barrier.await();
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
+                    while (!writerDone.get()) {
+                        assertEquals(map.get(1), "v1");
+                        map.get(2);
+                        map.get(3);
+                    }
+                } catch (Throwable t) {
+                    ex.compareAndSet(null, t);
+                }
+                return null;
+            }));
+        }
+
+        futures.add(executor.submit(() -> {
+            barrier.await();
+            try {
+                for (int i = 0; i < n; i++) {
+                    assertNull(map.put(2, "v2"));
+                    assertNull(map.put(3, "v3"));
+                    assertEquals(map.capacity(), 8);
+
+                    assertTrue(map.remove(2, "v2"));
+                    assertTrue(map.remove(3, "v3"));
+                    assertEquals(map.capacity(), 4);
+                }
+            } finally {
+                writerDone.set(true);
+            }
+            return null;
+        }));
+
+        for (Future<?> future : futures) {
+            future.get(60, TimeUnit.SECONDS);
+        }
+
+        assertNull(ex.get());
+    }
+
+    /**
+     * Many concurrent writers all targeting the same section so {@code 
put}/{@code remove} race
+     * against {@code rehash} (both expand and shrink). Each writer owns a 
disjoint key range so
+     * the post-condition is deterministic. Readers concurrently look up every 
key written.
+     */
+    @Test
+    public void testConcurrentMultiWriterExpandShrink() throws Throwable {
+        ConcurrentLongHashMap<String> map = 
ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(4)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.20f)
+                .build();
+
+        final int writeThreads = 8;
+        final int readThreads = 8;
+        final int rounds = 200;
+        final int keysPerThread = 64;
+
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newCachedThreadPool();
+        CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+        AtomicReference<Throwable> ex = new AtomicReference<>();
+        AtomicBoolean writersDone = new AtomicBoolean(false);
+        List<Future<?>> futures = new ArrayList<>();
+
+        for (int t = 0; t < writeThreads; t++) {
+            final long base = (long) t * keysPerThread;
+            futures.add(executor.submit(() -> {
+                barrier.await();
+                try {
+                    for (int round = 0; round < rounds; round++) {
+                        for (int k = 0; k < keysPerThread; k++) {
+                            map.put(base + k, "v-" + (base + k));
+                        }
+                        for (int k = 0; k < keysPerThread; k++) {
+                            assertEquals(map.get(base + k), "v-" + (base + k));
+                        }
+                        for (int k = 0; k < keysPerThread; k++) {
+                            assertEquals(map.remove(base + k), "v-" + (base + 
k));
+                        }
+                        for (int k = 0; k < keysPerThread; k++) {
+                            assertNull(map.get(base + k));
+                        }
+                    }
+                } catch (Throwable th) {
+                    ex.compareAndSet(null, th);
                 }
+                return null;
+            }));
+        }
+
+        for (int r = 0; r < readThreads; r++) {
+            futures.add(executor.submit(() -> {
+                barrier.await();
                 try {
-                    map.get(1);
-                } catch (Exception e) {
-                    ex.set(e);
+                    long total = (long) writeThreads * keysPerThread;
+                    long key = 0;
+                    while (!writersDone.get()) {
+                        String v = map.get(key);
+                        if (v != null && !v.equals("v-" + key)) {
+                            throw new AssertionError("torn read for key " + 
key + ": " + v);
+                        }
+                        key = (key + 1) % total;
+                    }
+                } catch (Throwable th) {
+                    ex.compareAndSet(null, th);
                 }
-            });
+                return null;
+            }));
         }
 
-        assertNull(map.put(1, "v1"));
-        future = executor.submit(() -> {
-            try {
+        for (int i = 0; i < writeThreads; i++) {
+            futures.get(i).get(120, TimeUnit.SECONDS);
+        }
+        writersDone.set(true);
+        for (int i = writeThreads; i < futures.size(); i++) {
+            futures.get(i).get(60, TimeUnit.SECONDS);
+        }
+
+        assertNull(ex.get());
+        assertEquals(map.size(), 0);
+    }
+
+    /**
+     * Differential test against {@link 
java.util.concurrent.ConcurrentHashMap}. Each thread owns
+     * a disjoint key partition (so any single-key sequence is linearizable), 
but every operation
+     * is mirrored onto both maps. Per-call return values must agree, and 
after the workload the
+     * two maps must contain exactly the same entries — including the reverse 
direction.
+     */
+    @Test
+    public void testCorrectnessAgainstConcurrentHashMap() throws Throwable {
+        ConcurrentLongHashMap<String> map = 
ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(8)
+                .concurrencyLevel(4)
+                .autoShrink(true)
+                .mapIdleFactor(0.20f)
+                .build();
+        ConcurrentHashMap<Long, String> reference = new ConcurrentHashMap<>();
+
+        final int nThreads = 8;
+        final int opsPerThread = 50_000;
+        final int keyRange = 2048;
+
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newFixedThreadPool(nThreads);
+        CyclicBarrier barrier = new CyclicBarrier(nThreads);
+        List<Future<?>> futures = new ArrayList<>();
+
+        for (int t = 0; t < nThreads; t++) {
+            final int threadId = t;
+            final long base = (long) threadId << 40;
+            futures.add(executor.submit(() -> {
+                Random rnd = new Random(threadId);
                 barrier.await();
-            } catch (Exception e) {
-                throw new RuntimeException(e);
+                for (int i = 0; i < opsPerThread; i++) {
+                    long key = base + rnd.nextInt(keyRange);
+                    int op = rnd.nextInt(5);
+                    String value = "v-" + threadId + "-" + i;
+                    switch (op) {
+                        case 0:
+                            assertEquals(map.put(key, value), 
reference.put(key, value));
+                            break;
+                        case 1:
+                            assertEquals(map.putIfAbsent(key, value), 
reference.putIfAbsent(key, value));
+                            break;
+                        case 2:
+                            assertEquals(map.remove(key), 
reference.remove(key));
+                            break;
+                        case 3:
+                            assertEquals(map.get(key), reference.get(key));
+                            break;
+                        default:
+                            assertEquals(map.containsKey(key), 
reference.containsKey(key));
+                            break;
+                    }
+                }
+                return null;
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get(120, TimeUnit.SECONDS);
+        }
+
+        assertEquals(map.size(), (long) reference.size());
+        for (Map.Entry<Long, String> e : reference.entrySet()) {
+            assertEquals(map.get(e.getKey()), e.getValue());
+        }
+        AtomicLong observed = new AtomicLong();
+        map.forEach((k, v) -> {
+            observed.incrementAndGet();
+            assertEquals(v, reference.get(k));
+        });
+        assertEquals(observed.get(), (long) reference.size());
+    }
+
+    /**
+     * Cross-thread put-publish-then-read invariant: once a {@code put(k, v)} 
has returned and the
+     * writer has published k via a volatile counter, EVERY reader that 
observes that counter must
+     * see a non-null value for k. A failure here would mean a successful put 
was "lost" by the
+     * map's get path — the failure mode the Table-snapshot design exists to 
prevent.
+     *
+     * <p>The map starts at the smallest legal capacity with autoShrink 
enabled, so the rehash
+     * code path is exercised on virtually every put. This is the most 
aggressive workload for
+     * the rehash-vs-get race that the previous separate-volatile-arrays 
design couldn't survive.
+     */
+    @Test
+    public void testNoLostGetAfterPublish() throws Throwable {
+        ConcurrentLongHashMap<String> map = 
ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+
+        final int totalKeys = 50_000;
+        final int readerThreads = 8;
+
+        AtomicLong highestPublished = new AtomicLong(-1);
+        AtomicReference<Throwable> ex = new AtomicReference<>();
+
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newCachedThreadPool();
+        CyclicBarrier barrier = new CyclicBarrier(readerThreads + 1);
+        List<Future<?>> futures = new ArrayList<>();
+
+        // Writer: put then publish. The volatile-set on highestPublished 
establishes
+        // happens-before with any reader that observes the published value.
+        futures.add(executor.submit(() -> {
+            barrier.await();
+            for (int i = 0; i < totalKeys; i++) {
+                assertNull(map.put(i, "v" + i));
+                highestPublished.set(i);
             }
+            return null;
+        }));
+
+        // Readers: observe the published counter, then verify every key in 
[0, counter] is
+        // present with the expected value. The reader pulls the counter once 
per cycle and
+        // catches up to it before pulling again.
+        for (int r = 0; r < readerThreads; r++) {
+            futures.add(executor.submit(() -> {
+                barrier.await();
+                try {
+                    long lastChecked = -1;
+                    while (lastChecked < totalKeys - 1) {
+                        long target = highestPublished.get();
+                        while (lastChecked < target) {
+                            lastChecked++;
+                            String v = map.get(lastChecked);
+                            if (v == null) {
+                                throw new AssertionError(
+                                        "lost get for key " + lastChecked
+                                                + "; highestPublished=" + 
target);
+                            }
+                            if (!v.equals("v" + lastChecked)) {
+                                throw new AssertionError(
+                                        "wrong value for key " + lastChecked + 
": " + v);
+                            }
+                        }
+                    }
+                } catch (Throwable t) {
+                    ex.compareAndSet(null, t);
+                }
+                return null;
+            }));
+        }
+
+        for (Future<?> f : futures) {
+            f.get(120, TimeUnit.SECONDS);
+        }
+
+        assertNull(ex.get());
+        assertEquals(map.size(), (long) totalKeys);
+    }
 
-            for (int i = 0; i < n; i++) {
-                // expand hashmap
-                assertNull(map.put(2, "v2"));
-                assertNull(map.put(3, "v3"));
-                assertEquals(map.capacity(), 8);
+    /**
+     * forEach during concurrent writes is documented as not strongly 
thread-safe, but it must
+     * never throw, never expose {@code DeletedValue}/{@code EmptyValue} 
sentinels, and every
+     * observed (key, value) pair must be a legitimate pair that was written 
at some point.
+     */
+    @Test
+    public void testForEachDuringWrites() throws Throwable {
+        ConcurrentLongHashMap<String> map = 
ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(8)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
 
-                // shrink hashmap
-                assertTrue(map.remove(2, "v2"));
-                assertTrue(map.remove(3, "v3"));
-                assertEquals(map.capacity(), 4);
+        final int writers = 4;
+        final int keysPerWriter = 256;
+        final int writeRounds = 200;
+        final int forEachRounds = 100;
+
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newCachedThreadPool();
+        CyclicBarrier barrier = new CyclicBarrier(writers + 1);
+        AtomicReference<Throwable> ex = new AtomicReference<>();
+        AtomicBoolean writersDone = new AtomicBoolean(false);
+        List<Future<?>> futures = new ArrayList<>();
+
+        for (int t = 0; t < writers; t++) {
+            final long base = (long) t * keysPerWriter;
+            futures.add(executor.submit(() -> {
+                barrier.await();
+                try {
+                    for (int round = 0; round < writeRounds; round++) {
+                        for (int k = 0; k < keysPerWriter; k++) {
+                            map.put(base + k, "v-" + (base + k));
+                        }
+                        for (int k = 0; k < keysPerWriter; k++) {
+                            map.remove(base + k);
+                        }
+                    }
+                } catch (Throwable th) {
+                    ex.compareAndSet(null, th);
+                }
+                return null;
+            }));
+        }
+
+        futures.add(executor.submit(() -> {
+            barrier.await();
+            try {
+                for (int round = 0; round < forEachRounds && 
!writersDone.get(); round++) {
+                    AtomicInteger seen = new AtomicInteger();
+                    map.forEach((k, v) -> {
+                        seen.incrementAndGet();
+                        String expected = "v-" + k;
+                        if (!expected.equals(v)) {
+                            throw new AssertionError("Inconsistent (k,v): (" + 
k + "," + v + ")");
+                        }
+                    });
+                    long sz = map.size();
+                    assertTrue(sz >= 0, "size went negative: " + sz);
+                    assertTrue(sz <= (long) writers * keysPerWriter, "size > 
universe: " + sz);
+                }
+            } catch (Throwable th) {
+                ex.compareAndSet(null, th);
             }
-        });
+            return null;
+        }));
+
+        for (int i = 0; i < writers; i++) {
+            futures.get(i).get(120, TimeUnit.SECONDS);
+        }
+        writersDone.set(true);
+        futures.get(writers).get(60, TimeUnit.SECONDS);
 
-        future.get();
-        assertTrue(ex.get() == null);
-        // shut down pool
-        executor.shutdown();
+        assertNull(ex.get());
     }
 
     @Test


Reply via email to