This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch feature/concurrent-fastutil-maps
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5932e34a88610e2958d49313bfefe4c54175c98c
Author: Yongqiang YANG <[email protected]>
AuthorDate: Sat Mar 14 05:10:34 2026 -0700

    [Enhancement](memory) Add ConcurrentLong2ObjectHashMap and 
ConcurrentLong2LongHashMap
    
    Add thread-safe primitive-key concurrent hash maps built on fastutil,
    designed to replace ConcurrentHashMap<Long, V> and ConcurrentHashMap<Long, 
Long>
    in memory-sensitive FE paths.
    
    These maps eliminate Long autoboxing overhead and reduce per-entry memory
    from ~64 bytes (ConcurrentHashMap) to ~16 bytes, a 4x improvement.
    
    Key design:
    - Segment-based locking (default 16 segments) for concurrent throughput
    - Full Map interface compatibility for drop-in replacement
    - Atomic putIfAbsent, computeIfAbsent, replace, remove operations
    - Comprehensive unit tests covering CRUD, concurrency, iteration,
      edge cases, and default value semantics
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
 .../doris/common/ConcurrentLong2LongHashMap.java   | 486 +++++++++++++++++++++
 .../doris/common/ConcurrentLong2ObjectHashMap.java | 443 +++++++++++++++++++
 .../common/ConcurrentLong2LongHashMapTest.java     | 455 +++++++++++++++++++
 .../common/ConcurrentLong2ObjectHashMapTest.java   | 432 ++++++++++++++++++
 4 files changed, 1816 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2LongHashMap.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2LongHashMap.java
new file mode 100644
index 00000000000..0cb8de92dcd
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2LongHashMap.java
@@ -0,0 +1,486 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import it.unimi.dsi.fastutil.HashCommon;
+import it.unimi.dsi.fastutil.longs.AbstractLong2LongMap;
+import it.unimi.dsi.fastutil.longs.Long2LongFunction;
+import it.unimi.dsi.fastutil.longs.Long2LongMap;
+import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongBinaryOperator;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.LongUnaryOperator;
+
+/**
+ * A concurrent map with primitive long keys and primitive long values, backed 
by segmented
+ * {@link Long2LongOpenHashMap} instances with {@link ReentrantReadWriteLock} 
per segment.
+ *
+ * <p>This class saves ~48 bytes per entry compared to {@code 
ConcurrentHashMap<Long, Long>}
+ * by avoiding boxing of both keys and values. For fields like partition 
update row counts
+ * with millions of entries, this translates to hundreds of MB of heap savings.
+ *
+ * <p>The {@link #addTo(long, long)} method provides atomic increment 
semantics, useful for
+ * counter patterns.
+ *
+ * <p><b>Important:</b> All compound operations from both {@link Long2LongMap} 
and {@link Map}
+ * interfaces are overridden to ensure atomicity within a segment's write lock.
+ */
+public class ConcurrentLong2LongHashMap extends AbstractLong2LongMap {
+
+    private static final int DEFAULT_SEGMENT_COUNT = 16;
+    private static final int DEFAULT_INITIAL_CAPACITY_PER_SEGMENT = 16;
+
+    private final Segment[] segments;
+    private final int segmentMask;
+    private final int segmentBits;
+
+    public ConcurrentLong2LongHashMap() {
+        this(DEFAULT_SEGMENT_COUNT);
+    }
+
+    public ConcurrentLong2LongHashMap(int segmentCount) {
+        if (segmentCount <= 0 || (segmentCount & (segmentCount - 1)) != 0) {
+            throw new IllegalArgumentException("segmentCount must be a 
positive power of 2: " + segmentCount);
+        }
+        this.segmentBits = Integer.numberOfTrailingZeros(segmentCount);
+        this.segmentMask = segmentCount - 1;
+        this.segments = new Segment[segmentCount];
+        for (int i = 0; i < segmentCount; i++) {
+            segments[i] = new Segment(DEFAULT_INITIAL_CAPACITY_PER_SEGMENT);
+        }
+    }
+
+    private Segment segmentFor(long key) {
+        return segments[(int) (HashCommon.mix(key) >>> (64 - segmentBits)) & 
segmentMask];
+    }
+
+    // ---- Read operations (read-lock) ----
+
+    @Override
+    public long get(long key) {
+        Segment seg = segmentFor(key);
+        seg.lock.readLock().lock();
+        try {
+            return seg.map.get(key);
+        } finally {
+            seg.lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public long getOrDefault(long key, long defaultValue) {
+        Segment seg = segmentFor(key);
+        seg.lock.readLock().lock();
+        try {
+            return seg.map.getOrDefault(key, defaultValue);
+        } finally {
+            seg.lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean containsKey(long key) {
+        Segment seg = segmentFor(key);
+        seg.lock.readLock().lock();
+        try {
+            return seg.map.containsKey(key);
+        } finally {
+            seg.lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean containsValue(long value) {
+        for (Segment seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                if (seg.map.containsValue(value)) {
+                    return true;
+                }
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public int size() {
+        long total = 0;
+        for (Segment seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                total += seg.map.size();
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return (int) Math.min(total, Integer.MAX_VALUE);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        for (Segment seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                if (!seg.map.isEmpty()) {
+                    return false;
+                }
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return true;
+    }
+
+    // ---- Write operations (write-lock) ----
+
+    @Override
+    public long put(long key, long value) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.put(key, value);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public long remove(long key) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.remove(key);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public long putIfAbsent(long key, long value) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.putIfAbsent(key, value);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean replace(long key, long oldValue, long newValue) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.replace(key, oldValue, newValue);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public long replace(long key, long value) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.replace(key, value);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void clear() {
+        for (Segment seg : segments) {
+            seg.lock.writeLock().lock();
+            try {
+                seg.map.clear();
+            } finally {
+                seg.lock.writeLock().unlock();
+            }
+        }
+    }
+
+    @Override
+    public void putAll(Map<? extends Long, ? extends Long> m) {
+        for (Map.Entry<? extends Long, ? extends Long> entry : m.entrySet()) {
+            put(entry.getKey().longValue(), entry.getValue().longValue());
+        }
+    }
+
+    // ---- Atomic compound operations ----
+    // Override ALL compound methods from both Long2LongMap and Map interfaces.
+
+    /**
+     * Atomically adds the given increment to the value associated with the 
key.
+     * If the key is not present, the entry is created with the increment as 
value
+     * (starting from defaultReturnValue, which is 0L by default).
+     *
+     * @return the new value after the increment
+     */
+    public long addTo(long key, long increment) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            long newValue = seg.map.addTo(key, increment) + increment;
+            return newValue;
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public long computeIfAbsent(long key, LongUnaryOperator mappingFunction) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            if (seg.map.containsKey(key)) {
+                return seg.map.get(key);
+            }
+            long newValue = mappingFunction.applyAsLong(key);
+            seg.map.put(key, newValue);
+            return newValue;
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public long computeIfAbsent(long key, Long2LongFunction mappingFunction) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            if (seg.map.containsKey(key)) {
+                return seg.map.get(key);
+            }
+            long newValue = mappingFunction.get(key);
+            seg.map.put(key, newValue);
+            return newValue;
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public Long computeIfAbsent(Long key, Function<? super Long, ? extends 
Long> mappingFunction) {
+        long k = key.longValue();
+        Segment seg = segmentFor(k);
+        seg.lock.writeLock().lock();
+        try {
+            if (seg.map.containsKey(k)) {
+                return seg.map.get(k);
+            }
+            Long newValue = mappingFunction.apply(key);
+            if (newValue != null) {
+                seg.map.put(k, newValue.longValue());
+            }
+            return newValue;
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public long computeIfPresent(long key,
+            BiFunction<? super Long, ? super Long, ? extends Long> 
remappingFunction) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            if (!seg.map.containsKey(key)) {
+                return defaultReturnValue();
+            }
+            long oldValue = seg.map.get(key);
+            Long newValue = remappingFunction.apply(key, oldValue);
+            if (newValue != null) {
+                seg.map.put(key, newValue.longValue());
+                return newValue;
+            } else {
+                seg.map.remove(key);
+                return defaultReturnValue();
+            }
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public long compute(long key, BiFunction<? super Long, ? super Long, ? 
extends Long> remappingFunction) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            Long oldValue = seg.map.containsKey(key) ? seg.map.get(key) : null;
+            Long newValue = remappingFunction.apply(key, oldValue);
+            if (newValue != null) {
+                seg.map.put(key, newValue.longValue());
+                return newValue;
+            } else if (oldValue != null) {
+                seg.map.remove(key);
+            }
+            return defaultReturnValue();
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public long merge(long key, long value,
+            BiFunction<? super Long, ? super Long, ? extends Long> 
remappingFunction) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            if (!seg.map.containsKey(key)) {
+                seg.map.put(key, value);
+                return value;
+            }
+            long oldValue = seg.map.get(key);
+            Long newValue = remappingFunction.apply(oldValue, value);
+            if (newValue != null) {
+                seg.map.put(key, newValue.longValue());
+                return newValue;
+            } else {
+                seg.map.remove(key);
+                return defaultReturnValue();
+            }
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public long mergeLong(long key, long value, 
java.util.function.LongBinaryOperator remappingFunction) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            if (!seg.map.containsKey(key)) {
+                seg.map.put(key, value);
+                return value;
+            }
+            long oldValue = seg.map.get(key);
+            long newValue = remappingFunction.applyAsLong(oldValue, value);
+            seg.map.put(key, newValue);
+            return newValue;
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    // ---- Iteration (weakly consistent snapshots) ----
+
+    @Override
+    public ObjectSet<Long2LongMap.Entry> long2LongEntrySet() {
+        ObjectOpenHashSet<Long2LongMap.Entry> snapshot = new 
ObjectOpenHashSet<>();
+        for (Segment seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                for (Long2LongMap.Entry entry : seg.map.long2LongEntrySet()) {
+                    snapshot.add(new 
AbstractLong2LongMap.BasicEntry(entry.getLongKey(), entry.getLongValue()));
+                }
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return snapshot;
+    }
+
+    @Override
+    public LongSet keySet() {
+        LongOpenHashSet snapshot = new LongOpenHashSet();
+        for (Segment seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                snapshot.addAll(seg.map.keySet());
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return snapshot;
+    }
+
+    /**
+     * Returns the keys as a {@link LongArrayList}.
+     */
+    public LongArrayList keyList() {
+        LongArrayList list = new LongArrayList(size());
+        for (Segment seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                list.addAll(seg.map.keySet());
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return list;
+    }
+
+    @Override
+    public it.unimi.dsi.fastutil.longs.LongCollection values() {
+        LongArrayList snapshot = new LongArrayList();
+        for (Segment seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                snapshot.addAll(seg.map.values());
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return snapshot;
+    }
+
+    /**
+     * Applies the given action to each entry under read-lock per segment.
+     */
+    public void forEach(LongLongConsumer action) {
+        for (Segment seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                for (Long2LongMap.Entry entry : seg.map.long2LongEntrySet()) {
+                    action.accept(entry.getLongKey(), entry.getLongValue());
+                }
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+    }
+
+    @FunctionalInterface
+    public interface LongLongConsumer {
+        void accept(long key, long value);
+    }
+
+    // ---- Segment inner class ----
+
+    private static final class Segment {
+        final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+        final Long2LongOpenHashMap map;
+
+        Segment(int initialCapacity) {
+            this.map = new Long2LongOpenHashMap(initialCapacity);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2ObjectHashMap.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2ObjectHashMap.java
new file mode 100644
index 00000000000..a599d9f712c
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2ObjectHashMap.java
@@ -0,0 +1,443 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import it.unimi.dsi.fastutil.HashCommon;
+import it.unimi.dsi.fastutil.longs.AbstractLong2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectFunction;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectCollection;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.LongFunction;
+
+/**
+ * A concurrent map with primitive long keys and object values, backed by 
segmented
+ * {@link Long2ObjectOpenHashMap} instances with {@link 
ReentrantReadWriteLock} per segment.
+ *
+ * <p>This class provides similar concurrency guarantees to {@link 
java.util.concurrent.ConcurrentHashMap}
+ * while avoiding the memory overhead of boxing long keys. For a cluster with 
millions of tablet entries,
+ * this saves ~32 bytes per entry compared to {@code ConcurrentHashMap<Long, 
V>}.
+ *
+ * <p>Iteration methods ({@link #long2ObjectEntrySet()}, {@link #keySet()}, 
{@link #values()})
+ * return snapshot copies and are weakly consistent.
+ *
+ * <p><b>Important:</b> All compound operations (computeIfAbsent, 
computeIfPresent, compute, merge)
+ * from both {@link Long2ObjectMap} and {@link Map} interfaces are overridden 
to ensure atomicity
+ * within a segment. The default interface implementations would call get/put 
as separate locked
+ * operations, breaking atomicity.
+ *
+ * @param <V> the type of mapped values
+ */
+public class ConcurrentLong2ObjectHashMap<V> extends AbstractLong2ObjectMap<V> 
{
+
+    private static final int DEFAULT_SEGMENT_COUNT = 16;
+    private static final int DEFAULT_INITIAL_CAPACITY_PER_SEGMENT = 16;
+
+    private final Segment<V>[] segments;
+    private final int segmentMask;
+    private final int segmentBits;
+
+    public ConcurrentLong2ObjectHashMap() {
+        this(DEFAULT_SEGMENT_COUNT);
+    }
+
+    @SuppressWarnings("unchecked")
+    public ConcurrentLong2ObjectHashMap(int segmentCount) {
+        if (segmentCount <= 0 || (segmentCount & (segmentCount - 1)) != 0) {
+            throw new IllegalArgumentException("segmentCount must be a 
positive power of 2: " + segmentCount);
+        }
+        this.segmentBits = Integer.numberOfTrailingZeros(segmentCount);
+        this.segmentMask = segmentCount - 1;
+        this.segments = new Segment[segmentCount];
+        for (int i = 0; i < segmentCount; i++) {
+            segments[i] = new Segment<>(DEFAULT_INITIAL_CAPACITY_PER_SEGMENT);
+        }
+    }
+
+    private Segment<V> segmentFor(long key) {
+        return segments[(int) (HashCommon.mix(key) >>> (64 - segmentBits)) & 
segmentMask];
+    }
+
+    // ---- Read operations (read-lock) ----
+
+    @Override
+    public V get(long key) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.readLock().lock();
+        try {
+            return seg.map.get(key);
+        } finally {
+            seg.lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public V getOrDefault(long key, V defaultValue) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.readLock().lock();
+        try {
+            return seg.map.getOrDefault(key, defaultValue);
+        } finally {
+            seg.lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean containsKey(long key) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.readLock().lock();
+        try {
+            return seg.map.containsKey(key);
+        } finally {
+            seg.lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+        for (Segment<V> seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                if (seg.map.containsValue(value)) {
+                    return true;
+                }
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public int size() {
+        long total = 0;
+        for (Segment<V> seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                total += seg.map.size();
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return (int) Math.min(total, Integer.MAX_VALUE);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        for (Segment<V> seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                if (!seg.map.isEmpty()) {
+                    return false;
+                }
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return true;
+    }
+
+    // ---- Write operations (write-lock) ----
+
+    @Override
+    public V put(long key, V value) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.put(key, value);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public V remove(long key) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.remove(key);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public V putIfAbsent(long key, V value) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.putIfAbsent(key, value);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean replace(long key, V oldValue, V newValue) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.replace(key, oldValue, newValue);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public V replace(long key, V value) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.replace(key, value);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void clear() {
+        for (Segment<V> seg : segments) {
+            seg.lock.writeLock().lock();
+            try {
+                seg.map.clear();
+            } finally {
+                seg.lock.writeLock().unlock();
+            }
+        }
+    }
+
+    @Override
+    public void putAll(Map<? extends Long, ? extends V> m) {
+        for (Map.Entry<? extends Long, ? extends V> entry : m.entrySet()) {
+            put(entry.getKey().longValue(), entry.getValue());
+        }
+    }
+
+    // ---- Atomic compound operations ----
+    // Override ALL compound methods from both Long2ObjectMap and Map 
interfaces
+    // to ensure the check-then-act is atomic within a segment's write lock.
+
+    @Override
+    public V computeIfAbsent(long key, LongFunction<? extends V> 
mappingFunction) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            V val = seg.map.get(key);
+            if (val != null || seg.map.containsKey(key)) {
+                return val;
+            }
+            V newValue = mappingFunction.apply(key);
+            seg.map.put(key, newValue);
+            return newValue;
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public V computeIfAbsent(long key, Long2ObjectFunction<? extends V> 
mappingFunction) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            V val = seg.map.get(key);
+            if (val != null || seg.map.containsKey(key)) {
+                return val;
+            }
+            V newValue = mappingFunction.get(key);
+            seg.map.put(key, newValue);
+            return newValue;
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public V computeIfAbsent(Long key, Function<? super Long, ? extends V> 
mappingFunction) {
+        return computeIfAbsent(key.longValue(), (long k) -> 
mappingFunction.apply(k));
+    }
+
+    @Override
+    public V computeIfPresent(long key, BiFunction<? super Long, ? super V, ? 
extends V> remappingFunction) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            V oldValue = seg.map.get(key);
+            if (oldValue != null || seg.map.containsKey(key)) {
+                V newValue = remappingFunction.apply(key, oldValue);
+                if (newValue != null) {
+                    seg.map.put(key, newValue);
+                } else {
+                    seg.map.remove(key);
+                }
+                return newValue;
+            }
+            return null;
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public V compute(long key, BiFunction<? super Long, ? super V, ? extends 
V> remappingFunction) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            V oldValue = seg.map.containsKey(key) ? seg.map.get(key) : null;
+            V newValue = remappingFunction.apply(key, oldValue);
+            if (newValue != null) {
+                seg.map.put(key, newValue);
+            } else if (seg.map.containsKey(key)) {
+                seg.map.remove(key);
+            }
+            return newValue;
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public V merge(long key, V value, BiFunction<? super V, ? super V, ? 
extends V> remappingFunction) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            V oldValue = seg.map.get(key);
+            V newValue;
+            if (oldValue != null || seg.map.containsKey(key)) {
+                newValue = remappingFunction.apply(oldValue, value);
+            } else {
+                newValue = value;
+            }
+            if (newValue != null) {
+                seg.map.put(key, newValue);
+            } else {
+                seg.map.remove(key);
+            }
+            return newValue;
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    // ---- Iteration (weakly consistent snapshots) ----
+
+    @Override
+    public ObjectSet<Long2ObjectMap.Entry<V>> long2ObjectEntrySet() {
+        ObjectOpenHashSet<Long2ObjectMap.Entry<V>> snapshot = new 
ObjectOpenHashSet<>();
+        for (Segment<V> seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                for (Long2ObjectMap.Entry<V> entry : 
seg.map.long2ObjectEntrySet()) {
+                    snapshot.add(new 
AbstractLong2ObjectMap.BasicEntry<>(entry.getLongKey(), entry.getValue()));
+                }
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return snapshot;
+    }
+
+    @Override
+    public LongSet keySet() {
+        LongOpenHashSet snapshot = new LongOpenHashSet();
+        for (Segment<V> seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                snapshot.addAll(seg.map.keySet());
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return snapshot;
+    }
+
+    /**
+     * Returns the keys as a {@link LongArrayList}. Useful when callers need 
indexed access
+     * or will iterate the keys once. Snapshot-based and weakly consistent.
+     */
+    public LongArrayList keyList() {
+        LongArrayList list = new LongArrayList(size());
+        for (Segment<V> seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                list.addAll(seg.map.keySet());
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return list;
+    }
+
+    @Override
+    public ObjectCollection<V> values() {
+        ObjectArrayList<V> snapshot = new ObjectArrayList<>();
+        for (Segment<V> seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                snapshot.addAll(seg.map.values());
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return snapshot;
+    }
+
+    /**
+     * Applies the given action to each entry under read-lock per segment.
+     * This is more efficient than iterating {@link #long2ObjectEntrySet()} as 
it avoids
+     * creating a snapshot.
+     */
+    public void forEach(LongObjConsumer<? super V> action) {
+        for (Segment<V> seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                for (Long2ObjectMap.Entry<V> entry : 
seg.map.long2ObjectEntrySet()) {
+                    action.accept(entry.getLongKey(), entry.getValue());
+                }
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+    }
+
+    @FunctionalInterface
+    public interface LongObjConsumer<V> {
+        void accept(long key, V value);
+    }
+
+    // ---- Segment inner class ----
+
+    private static final class Segment<V> {
+        final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+        final Long2ObjectOpenHashMap<V> map;
+
+        Segment(int initialCapacity) {
+            this.map = new Long2ObjectOpenHashMap<>(initialCapacity);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2LongHashMapTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2LongHashMapTest.java
new file mode 100644
index 00000000000..4bc749d1501
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2LongHashMapTest.java
@@ -0,0 +1,455 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import com.google.gson.Gson;
+import it.unimi.dsi.fastutil.longs.Long2LongMap;
+import it.unimi.dsi.fastutil.longs.LongSet;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+class ConcurrentLong2LongHashMapTest {
+
+    @Test
+    void testPutAndGet() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 100L);
+        Assertions.assertEquals(100L, map.get(1L));
+        map.put(1L, 200L);
+        Assertions.assertEquals(200L, map.get(1L));
+    }
+
+    @Test
+    void testGetMissingKeyReturnsDefaultReturnValue() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        // Default return value is 0L
+        Assertions.assertEquals(0L, map.get(999L));
+    }
+
+    @Test
+    void testGetOrDefault() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 100L);
+        Assertions.assertEquals(100L, map.getOrDefault(1L, -1L));
+        Assertions.assertEquals(-1L, map.getOrDefault(2L, -1L));
+    }
+
+    @Test
+    void testRemove() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 100L);
+        Assertions.assertEquals(100L, map.remove(1L));
+        Assertions.assertFalse(map.containsKey(1L));
+        // Remove non-existent key returns defaultReturnValue
+        Assertions.assertEquals(0L, map.remove(1L));
+    }
+
+    @Test
+    void testContainsKey() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        Assertions.assertFalse(map.containsKey(1L));
+        map.put(1L, 0L);
+        Assertions.assertTrue(map.containsKey(1L));
+    }
+
+    @Test
+    void testContainsValue() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 100L);
+        map.put(2L, 200L);
+        Assertions.assertTrue(map.containsValue(100L));
+        Assertions.assertFalse(map.containsValue(300L));
+    }
+
+    @Test
+    void testSizeAndIsEmpty() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        Assertions.assertTrue(map.isEmpty());
+        Assertions.assertEquals(0, map.size());
+        map.put(1L, 100L);
+        map.put(2L, 200L);
+        Assertions.assertFalse(map.isEmpty());
+        Assertions.assertEquals(2, map.size());
+    }
+
+    @Test
+    void testClear() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 100L);
+        map.put(2L, 200L);
+        map.clear();
+        Assertions.assertTrue(map.isEmpty());
+        Assertions.assertEquals(0, map.size());
+    }
+
+    @Test
+    void testPutAll() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        Map<Long, Long> source = new HashMap<>();
+        source.put(1L, 100L);
+        source.put(2L, 200L);
+        source.put(3L, 300L);
+        map.putAll(source);
+        Assertions.assertEquals(3, map.size());
+        Assertions.assertEquals(200L, map.get(2L));
+    }
+
+    @Test
+    void testPutIfAbsent() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        Assertions.assertEquals(0L, map.putIfAbsent(1L, 100L));
+        Assertions.assertEquals(100L, map.putIfAbsent(1L, 200L));
+        Assertions.assertEquals(100L, map.get(1L));
+    }
+
+    @Test
+    void testComputeIfAbsent() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        long val = map.computeIfAbsent(1L, k -> k * 10);
+        Assertions.assertEquals(10L, val);
+        // Should not recompute
+        long val2 = map.computeIfAbsent(1L, k -> k * 20);
+        Assertions.assertEquals(10L, val2);
+    }
+
+    // ---- addTo tests ----
+
+    @Test
+    void testAddToNewKey() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        long result = map.addTo(1L, 5L);
+        Assertions.assertEquals(5L, result);
+        Assertions.assertEquals(5L, map.get(1L));
+    }
+
+    @Test
+    void testAddToExistingKey() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 10L);
+        long result = map.addTo(1L, 5L);
+        Assertions.assertEquals(15L, result);
+        Assertions.assertEquals(15L, map.get(1L));
+    }
+
+    @Test
+    void testAddToNegative() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 10L);
+        long result = map.addTo(1L, -3L);
+        Assertions.assertEquals(7L, result);
+    }
+
+    // ---- Iteration tests ----
+
+    @Test
+    void testEntrySet() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 100L);
+        map.put(2L, 200L);
+
+        ObjectSet<Long2LongMap.Entry> entries = map.long2LongEntrySet();
+        Assertions.assertEquals(2, entries.size());
+
+        Set<Long> keys = new HashSet<>();
+        for (Long2LongMap.Entry entry : entries) {
+            keys.add(entry.getLongKey());
+        }
+        Assertions.assertTrue(keys.contains(1L));
+        Assertions.assertTrue(keys.contains(2L));
+    }
+
+    @Test
+    void testKeySet() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(10L, 100L);
+        map.put(20L, 200L);
+        LongSet keys = map.keySet();
+        Assertions.assertEquals(2, keys.size());
+        Assertions.assertTrue(keys.contains(10L));
+        Assertions.assertTrue(keys.contains(20L));
+    }
+
+    @Test
+    void testValues() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 100L);
+        map.put(2L, 200L);
+        it.unimi.dsi.fastutil.longs.LongCollection values = map.values();
+        Assertions.assertEquals(2, values.size());
+        Assertions.assertTrue(values.contains(100L));
+        Assertions.assertTrue(values.contains(200L));
+    }
+
+    @Test
+    void testForEach() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 100L);
+        map.put(2L, 200L);
+        Map<Long, Long> collected = new HashMap<>();
+        map.forEach(collected::put);
+        Assertions.assertEquals(2, collected.size());
+        Assertions.assertEquals(100L, (long) collected.get(1L));
+    }
+
+    // ---- Large map test ----
+
+    @Test
+    void testLargeMap() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        int count = 100_000;
+        for (long i = 0; i < count; i++) {
+            map.put(i, i * 3);
+        }
+        Assertions.assertEquals(count, map.size());
+        for (long i = 0; i < count; i++) {
+            Assertions.assertEquals(i * 3, map.get(i));
+        }
+    }
+
+    @Test
+    void testCustomSegmentCount() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(4);
+        for (long i = 0; i < 1000; i++) {
+            map.put(i, i);
+        }
+        Assertions.assertEquals(1000, map.size());
+    }
+
+    @Test
+    void testInvalidSegmentCount() {
+        Assertions.assertThrows(IllegalArgumentException.class, () -> new 
ConcurrentLong2LongHashMap(3));
+        Assertions.assertThrows(IllegalArgumentException.class, () -> new 
ConcurrentLong2LongHashMap(0));
+    }
+
+    // ---- Concurrency tests ----
+
+    @Test
+    void testConcurrentPuts() throws Exception {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        int threads = 8;
+        int keysPerThread = 10_000;
+        ExecutorService executor = Executors.newFixedThreadPool(threads);
+        CountDownLatch latch = new CountDownLatch(threads);
+
+        for (int t = 0; t < threads; t++) {
+            final int threadId = t;
+            executor.submit(() -> {
+                for (int i = 0; i < keysPerThread; i++) {
+                    long key = (long) threadId * keysPerThread + i;
+                    map.put(key, key * 2);
+                }
+                latch.countDown();
+            });
+        }
+        latch.await();
+        executor.shutdown();
+        Assertions.assertEquals(threads * keysPerThread, map.size());
+    }
+
+    @Test
+    void testConcurrentAddTo() throws Exception {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        int threads = 16;
+        int incrementsPerThread = 10_000;
+        long key = 42L;
+        ExecutorService executor = Executors.newFixedThreadPool(threads);
+        CountDownLatch latch = new CountDownLatch(threads);
+
+        for (int t = 0; t < threads; t++) {
+            executor.submit(() -> {
+                for (int i = 0; i < incrementsPerThread; i++) {
+                    map.addTo(key, 1L);
+                }
+                latch.countDown();
+            });
+        }
+        latch.await();
+        executor.shutdown();
+
+        Assertions.assertEquals((long) threads * incrementsPerThread, 
map.get(key));
+    }
+
+    @Test
+    void testConcurrentReadWrite() throws Exception {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        for (long i = 0; i < 1000; i++) {
+            map.put(i, i);
+        }
+
+        int threads = 8;
+        ExecutorService executor = Executors.newFixedThreadPool(threads);
+        CountDownLatch latch = new CountDownLatch(threads);
+        AtomicInteger errors = new AtomicInteger();
+
+        for (int t = 0; t < threads; t++) {
+            final int threadId = t;
+            executor.submit(() -> {
+                try {
+                    for (int i = 0; i < 5000; i++) {
+                        long key = i % 1000;
+                        if (threadId % 2 == 0) {
+                            map.get(key);
+                            map.containsKey(key);
+                        } else {
+                            map.put(key + 1000L * threadId, (long) i);
+                        }
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+        latch.await();
+        executor.shutdown();
+        Assertions.assertEquals(0, errors.get());
+    }
+
+    @Test
+    void testConcurrentComputeIfAbsent() throws Exception {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        int threads = 16;
+        long sharedKey = 42L;
+        AtomicInteger computeCount = new AtomicInteger();
+        ExecutorService executor = Executors.newFixedThreadPool(threads);
+        List<Future<Long>> futures = new ArrayList<>();
+
+        for (int t = 0; t < threads; t++) {
+            futures.add(executor.submit(() ->
+                    map.computeIfAbsent(sharedKey, k -> {
+                        computeCount.incrementAndGet();
+                        return k * 10;
+                    })
+            ));
+        }
+        Set<Long> results = new HashSet<>();
+        for (Future<Long> f : futures) {
+            results.add(f.get());
+        }
+        executor.shutdown();
+
+        Assertions.assertEquals(1, results.size());
+        Assertions.assertTrue(results.contains(420L));
+        Assertions.assertEquals(1, computeCount.get());
+    }
+
+    @Test
+    void testConcurrentIterationDuringModification() throws Exception {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        for (long i = 0; i < 1000; i++) {
+            map.put(i, i);
+        }
+
+        int threads = 4;
+        ExecutorService executor = Executors.newFixedThreadPool(threads);
+        CountDownLatch latch = new CountDownLatch(threads);
+        AtomicInteger errors = new AtomicInteger();
+
+        for (int t = 0; t < threads; t++) {
+            final int threadId = t;
+            executor.submit(() -> {
+                try {
+                    for (int i = 0; i < 100; i++) {
+                        if (threadId % 2 == 0) {
+                            map.keySet();
+                            map.values();
+                            map.long2LongEntrySet();
+                        } else {
+                            map.put(1000L + threadId * 100 + i, (long) i);
+                            map.remove((long) (i % 500));
+                        }
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+        latch.await();
+        executor.shutdown();
+        Assertions.assertEquals(0, errors.get());
+    }
+
+    // ---- Gson serialization tests ----
+
+    @Test
+    void testGsonRoundTrip() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(100L, 1000L);
+        map.put(200L, 2000L);
+
+        String json = GsonUtils.GSON.toJson(map);
+
+        ConcurrentLong2LongHashMap deserialized = 
GsonUtils.GSON.fromJson(json, ConcurrentLong2LongHashMap.class);
+
+        Assertions.assertEquals(2, deserialized.size());
+        Assertions.assertEquals(1000L, deserialized.get(100L));
+        Assertions.assertEquals(2000L, deserialized.get(200L));
+    }
+
+    @Test
+    void testGsonFormatCompatibleWithConcurrentHashMap() {
+        ConcurrentHashMap<Long, Long> chm = new ConcurrentHashMap<>();
+        chm.put(1L, 100L);
+        chm.put(2L, 200L);
+        String chmJson = new Gson().toJson(chm);
+
+        ConcurrentLong2LongHashMap fastMap = new ConcurrentLong2LongHashMap();
+        fastMap.put(1L, 100L);
+        fastMap.put(2L, 200L);
+        String fastJson = GsonUtils.GSON.toJson(fastMap);
+
+        Gson gson = new Gson();
+        Map<?, ?> chmParsed = gson.fromJson(chmJson, Map.class);
+        Map<?, ?> fastParsed = gson.fromJson(fastJson, Map.class);
+        Assertions.assertEquals(chmParsed, fastParsed);
+    }
+
+    @Test
+    void testDefaultReturnValueBehavior() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        // Primitive get returns 0L (defaultReturnValue) for missing keys
+        Assertions.assertEquals(0L, map.get(999L));
+
+        // Store 0L explicitly
+        map.put(1L, 0L);
+        Assertions.assertTrue(map.containsKey(1L));
+        Assertions.assertEquals(0L, map.get(1L));
+
+        // Boxed get via Map<Long,Long> interface returns null for missing keys
+        Long boxedResult = map.getOrDefault(999L, map.defaultReturnValue());
+        Assertions.assertEquals(0L, boxedResult);
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2ObjectHashMapTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2ObjectHashMapTest.java
new file mode 100644
index 00000000000..88f1a45e7e3
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2ObjectHashMapTest.java
@@ -0,0 +1,432 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.LongSet;
+import it.unimi.dsi.fastutil.objects.ObjectCollection;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+class ConcurrentLong2ObjectHashMapTest {
+
+    @Test
+    void testPutAndGet() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        Assertions.assertNull(map.put(1L, "one"));
+        Assertions.assertEquals("one", map.get(1L));
+        Assertions.assertEquals("one", map.put(1L, "ONE"));
+        Assertions.assertEquals("ONE", map.get(1L));
+    }
+
+    @Test
+    void testGetMissingKey() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        Assertions.assertNull(map.get(999L));
+    }
+
+    @Test
+    void testGetOrDefault() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(1L, "one");
+        Assertions.assertEquals("one", map.getOrDefault(1L, "default"));
+        Assertions.assertEquals("default", map.getOrDefault(2L, "default"));
+    }
+
+    @Test
+    void testRemove() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(1L, "one");
+        Assertions.assertEquals("one", map.remove(1L));
+        Assertions.assertNull(map.get(1L));
+        Assertions.assertNull(map.remove(1L));
+    }
+
+    @Test
+    void testContainsKey() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        Assertions.assertFalse(map.containsKey(1L));
+        map.put(1L, "one");
+        Assertions.assertTrue(map.containsKey(1L));
+    }
+
+    @Test
+    void testContainsValue() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(1L, "one");
+        map.put(2L, "two");
+        Assertions.assertTrue(map.containsValue("one"));
+        Assertions.assertFalse(map.containsValue("three"));
+    }
+
+    @Test
+    void testSizeAndIsEmpty() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        Assertions.assertTrue(map.isEmpty());
+        Assertions.assertEquals(0, map.size());
+        map.put(1L, "one");
+        map.put(2L, "two");
+        Assertions.assertFalse(map.isEmpty());
+        Assertions.assertEquals(2, map.size());
+    }
+
+    @Test
+    void testClear() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(1L, "one");
+        map.put(2L, "two");
+        map.clear();
+        Assertions.assertTrue(map.isEmpty());
+        Assertions.assertEquals(0, map.size());
+    }
+
+    @Test
+    void testPutAll() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        Map<Long, String> source = new HashMap<>();
+        source.put(1L, "one");
+        source.put(2L, "two");
+        source.put(3L, "three");
+        map.putAll(source);
+        Assertions.assertEquals(3, map.size());
+        Assertions.assertEquals("two", map.get(2L));
+    }
+
+    @Test
+    void testPutIfAbsent() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        Assertions.assertNull(map.putIfAbsent(1L, "one"));
+        Assertions.assertEquals("one", map.putIfAbsent(1L, "ONE"));
+        Assertions.assertEquals("one", map.get(1L));
+    }
+
+    @Test
+    void testComputeIfAbsent() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        String val = map.computeIfAbsent(1L, k -> "computed-" + k);
+        Assertions.assertEquals("computed-1", val);
+        // Should not recompute
+        String val2 = map.computeIfAbsent(1L, k -> "recomputed-" + k);
+        Assertions.assertEquals("computed-1", val2);
+    }
+
+    @Test
+    void testComputeIfPresent() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        // Not present — should return null
+        Assertions.assertNull(map.computeIfPresent(1L, (k, v) -> v + 
"-updated"));
+
+        map.put(1L, "one");
+        String val = map.computeIfPresent(1L, (k, v) -> v + "-updated");
+        Assertions.assertEquals("one-updated", val);
+        Assertions.assertEquals("one-updated", map.get(1L));
+
+        // Return null to remove
+        Assertions.assertNull(map.computeIfPresent(1L, (k, v) -> null));
+        Assertions.assertFalse(map.containsKey(1L));
+    }
+
+    @Test
+    void testEntrySet() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(1L, "one");
+        map.put(2L, "two");
+
+        ObjectSet<Long2ObjectMap.Entry<String>> entries = 
map.long2ObjectEntrySet();
+        Assertions.assertEquals(2, entries.size());
+
+        Set<Long> keys = new HashSet<>();
+        for (Long2ObjectMap.Entry<String> entry : entries) {
+            keys.add(entry.getLongKey());
+        }
+        Assertions.assertTrue(keys.contains(1L));
+        Assertions.assertTrue(keys.contains(2L));
+    }
+
+    @Test
+    void testKeySet() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(10L, "ten");
+        map.put(20L, "twenty");
+        LongSet keys = map.keySet();
+        Assertions.assertEquals(2, keys.size());
+        Assertions.assertTrue(keys.contains(10L));
+        Assertions.assertTrue(keys.contains(20L));
+    }
+
+    @Test
+    void testValues() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(1L, "one");
+        map.put(2L, "two");
+        ObjectCollection<String> values = map.values();
+        Assertions.assertEquals(2, values.size());
+        Assertions.assertTrue(values.contains("one"));
+        Assertions.assertTrue(values.contains("two"));
+    }
+
+    @Test
+    void testStream() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        for (long i = 0; i < 100; i++) {
+            map.put(i, "val-" + i);
+        }
+        long count = map.values().stream().filter(v -> 
v.startsWith("val-")).count();
+        Assertions.assertEquals(100, count);
+    }
+
+    @Test
+    void testForEach() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(1L, "one");
+        map.put(2L, "two");
+        Map<Long, String> collected = new HashMap<>();
+        map.forEach(collected::put);
+        Assertions.assertEquals(2, collected.size());
+        Assertions.assertEquals("one", collected.get(1L));
+    }
+
+    @Test
+    void testNullValues() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(1L, null);
+        Assertions.assertTrue(map.containsKey(1L));
+        Assertions.assertNull(map.get(1L));
+    }
+
+    @Test
+    void testLargeMap() {
+        ConcurrentLong2ObjectHashMap<Long> map = new 
ConcurrentLong2ObjectHashMap<>();
+        int count = 100_000;
+        for (long i = 0; i < count; i++) {
+            map.put(i, i * 2);
+        }
+        Assertions.assertEquals(count, map.size());
+        for (long i = 0; i < count; i++) {
+            Assertions.assertEquals(Long.valueOf(i * 2), map.get(i));
+        }
+    }
+
+    @Test
+    void testCustomSegmentCount() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>(4);
+        for (long i = 0; i < 1000; i++) {
+            map.put(i, "v" + i);
+        }
+        Assertions.assertEquals(1000, map.size());
+    }
+
+    @Test
+    void testInvalidSegmentCount() {
+        Assertions.assertThrows(IllegalArgumentException.class, () -> new 
ConcurrentLong2ObjectHashMap<>(3));
+        Assertions.assertThrows(IllegalArgumentException.class, () -> new 
ConcurrentLong2ObjectHashMap<>(0));
+        Assertions.assertThrows(IllegalArgumentException.class, () -> new 
ConcurrentLong2ObjectHashMap<>(-1));
+    }
+
+    // ---- Concurrency tests ----
+
+    @Test
+    void testConcurrentPuts() throws Exception {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        int threads = 8;
+        int keysPerThread = 10_000;
+        ExecutorService executor = Executors.newFixedThreadPool(threads);
+        CountDownLatch latch = new CountDownLatch(threads);
+
+        for (int t = 0; t < threads; t++) {
+            final int threadId = t;
+            executor.submit(() -> {
+                for (int i = 0; i < keysPerThread; i++) {
+                    long key = (long) threadId * keysPerThread + i;
+                    map.put(key, "t" + threadId + "-" + i);
+                }
+                latch.countDown();
+            });
+        }
+        latch.await();
+        executor.shutdown();
+
+        Assertions.assertEquals(threads * keysPerThread, map.size());
+    }
+
+    @Test
+    void testConcurrentReadWrite() throws Exception {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        // Pre-populate
+        for (long i = 0; i < 1000; i++) {
+            map.put(i, "v" + i);
+        }
+
+        int threads = 8;
+        ExecutorService executor = Executors.newFixedThreadPool(threads);
+        CountDownLatch latch = new CountDownLatch(threads);
+        AtomicInteger errors = new AtomicInteger();
+
+        for (int t = 0; t < threads; t++) {
+            final int threadId = t;
+            executor.submit(() -> {
+                try {
+                    for (int i = 0; i < 5000; i++) {
+                        long key = i % 1000;
+                        if (threadId % 2 == 0) {
+                            // Reader
+                            map.get(key);
+                            map.containsKey(key);
+                        } else {
+                            // Writer
+                            map.put(key + 1000L * threadId, "new-" + i);
+                        }
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+        latch.await();
+        executor.shutdown();
+        Assertions.assertEquals(0, errors.get());
+    }
+
+    @Test
+    void testConcurrentComputeIfAbsent() throws Exception {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        int threads = 16;
+        long sharedKey = 42L;
+        AtomicInteger computeCount = new AtomicInteger();
+        ExecutorService executor = Executors.newFixedThreadPool(threads);
+        List<Future<String>> futures = new ArrayList<>();
+
+        for (int t = 0; t < threads; t++) {
+            futures.add(executor.submit(() ->
+                    map.computeIfAbsent(sharedKey, k -> {
+                        computeCount.incrementAndGet();
+                        return "computed";
+                    })
+            ));
+        }
+        Set<String> results = new HashSet<>();
+        for (Future<String> f : futures) {
+            results.add(f.get());
+        }
+        executor.shutdown();
+
+        // All threads should get the same value
+        Assertions.assertEquals(1, results.size());
+        Assertions.assertTrue(results.contains("computed"));
+        // The function should have been called exactly once
+        Assertions.assertEquals(1, computeCount.get());
+    }
+
+    @Test
+    void testConcurrentIterationDuringModification() throws Exception {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        for (long i = 0; i < 1000; i++) {
+            map.put(i, "v" + i);
+        }
+
+        int threads = 4;
+        ExecutorService executor = Executors.newFixedThreadPool(threads);
+        CountDownLatch latch = new CountDownLatch(threads);
+        AtomicInteger errors = new AtomicInteger();
+
+        for (int t = 0; t < threads; t++) {
+            final int threadId = t;
+            executor.submit(() -> {
+                try {
+                    for (int i = 0; i < 100; i++) {
+                        if (threadId % 2 == 0) {
+                            // Iterator - should not throw
+                            map.keySet();
+                            map.values();
+                            map.long2ObjectEntrySet();
+                        } else {
+                            // Modifier
+                            map.put(1000L + threadId * 100 + i, "new");
+                            map.remove((long) (i % 500));
+                        }
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+        latch.await();
+        executor.shutdown();
+        Assertions.assertEquals(0, errors.get());
+    }
+
+    // ---- Gson serialization tests ----
+
+    @Test
+    void testGsonRoundTrip() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(100L, "hundred");
+        map.put(200L, "two-hundred");
+
+        String json = GsonUtils.GSON.toJson(map);
+
+        Type type = new TypeToken<ConcurrentLong2ObjectHashMap<String>>() 
{}.getType();
+        ConcurrentLong2ObjectHashMap<String> deserialized = 
GsonUtils.GSON.fromJson(json, type);
+
+        Assertions.assertEquals(2, deserialized.size());
+        Assertions.assertEquals("hundred", deserialized.get(100L));
+        Assertions.assertEquals("two-hundred", deserialized.get(200L));
+    }
+
+    @Test
+    void testGsonFormatCompatibleWithConcurrentHashMap() {
+        // Verify the JSON format matches what ConcurrentHashMap<Long, String> 
produces
+        ConcurrentHashMap<Long, String> chm = new ConcurrentHashMap<>();
+        chm.put(1L, "one");
+        chm.put(2L, "two");
+        String chmJson = new Gson().toJson(chm);
+
+        ConcurrentLong2ObjectHashMap<String> fastMap = new 
ConcurrentLong2ObjectHashMap<>();
+        fastMap.put(1L, "one");
+        fastMap.put(2L, "two");
+        String fastJson = GsonUtils.GSON.toJson(fastMap);
+
+        // Both should be parseable as the same JSON object
+        Gson gson = new Gson();
+        Map<?, ?> chmParsed = gson.fromJson(chmJson, Map.class);
+        Map<?, ?> fastParsed = gson.fromJson(fastJson, Map.class);
+        Assertions.assertEquals(chmParsed, fastParsed);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to