This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch feature/fastutil-memory-optimization in repository https://gitbox.apache.org/repos/asf/doris.git
commit 538a7b003835db9006265de498d8d150c9c7cda5 Author: Yongqiang YANG <[email protected]> AuthorDate: Thu Mar 12 06:49:56 2026 -0700 [opt](memory) Replace boxed Long-keyed maps with fastutil primitive collections in FE Replace HashMap<Long, V>, ConcurrentHashMap<Long, V>, HashSet<Long> and similar boxed collections with fastutil primitive-type-specialized collections across FE hot paths. This eliminates Long autoboxing overhead and reduces per-entry memory footprint by 3-6x depending on the collection type. Key changes: - Add ConcurrentLong2ObjectHashMap and ConcurrentLong2LongHashMap as thread-safe wrappers over fastutil maps with segment-based locking - Add Gson TypeAdapters for Long2ObjectOpenHashMap, Long2LongOpenHashMap, LongOpenHashSet, and the concurrent variants for serialization compatibility - Replace collections in 27 files across catalog, transaction, statistics, alter, clone, cloud, and load subsystems For a cluster with 1.3M tablets, estimated heap savings: 350-700MB. Co-Authored-By: Claude Opus 4.6 <[email protected]> --- .../java/org/apache/doris/alter/RollupJobV2.java | 9 +- .../org/apache/doris/alter/SchemaChangeJobV2.java | 17 +- .../apache/doris/catalog/MaterializedIndex.java | 7 +- .../java/org/apache/doris/catalog/OlapTable.java | 8 +- .../java/org/apache/doris/catalog/Partition.java | 6 +- .../org/apache/doris/catalog/PartitionInfo.java | 50 +-- .../apache/doris/catalog/TabletInvertedIndex.java | 4 +- .../org/apache/doris/catalog/TempPartitions.java | 4 +- .../org/apache/doris/clone/TabletScheduler.java | 6 +- .../cloud/catalog/CloudTabletInvertedIndex.java | 4 +- .../transaction/CloudGlobalTransactionMgr.java | 17 +- .../transaction/DeleteBitmapUpdateLockContext.java | 39 +- .../doris/common/ConcurrentLong2LongHashMap.java | 486 +++++++++++++++++++++ .../doris/common/ConcurrentLong2ObjectHashMap.java | 443 +++++++++++++++++++ .../main/java/org/apache/doris/load/DeleteJob.java | 8 +- .../doris/load/routineload/RoutineLoadManager.java | 9 +- .../org/apache/doris/master/ReportHandler.java | 4 +- .../org/apache/doris/persist/gson/GsonUtils.java | 185 ++++++++ .../org/apache/doris/statistics/AnalysisInfo.java | 7 +- .../apache/doris/statistics/AnalysisManager.java | 3 +- .../apache/doris/statistics/BaseAnalysisTask.java | 6 +- .../org/apache/doris/statistics/ColStatsMeta.java | 5 +- .../apache/doris/statistics/TableStatsMeta.java | 18 +- .../doris/statistics/util/StatisticsUtil.java | 5 +- .../org/apache/doris/task/PublishVersionTask.java | 5 +- .../doris/transaction/DatabaseTransactionMgr.java | 23 +- .../doris/transaction/PublishVersionDaemon.java | 8 +- .../apache/doris/transaction/TableCommitInfo.java | 7 +- .../apache/doris/transaction/TransactionState.java | 40 +- .../common/ConcurrentLong2LongHashMapTest.java | 455 +++++++++++++++++++ .../common/ConcurrentLong2ObjectHashMapTest.java | 432 ++++++++++++++++++ 31 files changed, 2177 insertions(+), 143 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 66bfaedc8a7..7bd5bcb90e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -69,6 +69,9 @@ import org.apache.doris.thrift.TTaskType; import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.TransactionState; +import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -100,9 +103,9 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { // partition id -> (rollup tablet id -> base tablet id) @SerializedName(value = "partitionIdToBaseRollupTabletIdMap") - protected Map<Long, Map<Long, Long>> partitionIdToBaseRollupTabletIdMap = Maps.newHashMap(); + protected Long2ObjectOpenHashMap<Map<Long, Long>> partitionIdToBaseRollupTabletIdMap = new Long2ObjectOpenHashMap<>(); @SerializedName(value = "partitionIdToRollupIndex") - protected Map<Long, MaterializedIndex> partitionIdToRollupIndex = Maps.newHashMap(); + protected Long2ObjectOpenHashMap<MaterializedIndex> partitionIdToRollupIndex = new Long2ObjectOpenHashMap<>(); // rollup and base schema info @SerializedName(value = "baseIndexId") @@ -172,7 +175,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { public void addTabletIdMap(long partitionId, long rollupTabletId, long baseTabletId) { Map<Long, Long> tabletIdMap = partitionIdToBaseRollupTabletIdMap - .computeIfAbsent(partitionId, k -> Maps.newHashMap()); + .computeIfAbsent(partitionId, k -> new Long2LongOpenHashMap()); tabletIdMap.put(rollupTabletId, baseTabletId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index a235a84f100..468ccc51aa5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -70,6 +70,9 @@ import org.apache.doris.thrift.TTaskType; import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.TransactionState; +import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.HashBasedTable; @@ -109,22 +112,22 @@ public class SchemaChangeJobV2 extends AlterJobV2 implements GsonPostProcessable protected Table<Long, Long, MaterializedIndex> partitionIndexMap = HashBasedTable.create(); // shadow index id -> origin index id @SerializedName(value = "indexIdMap") - protected Map<Long, Long> indexIdMap = Maps.newHashMap(); + protected Long2LongOpenHashMap indexIdMap = new Long2LongOpenHashMap(); // partition id -> origin index id @SerializedName(value = "partitionOriginIndexIdMap") - private Map<Long, Long> partitionOriginIndexIdMap = Maps.newHashMap(); + private Long2LongOpenHashMap partitionOriginIndexIdMap = new Long2LongOpenHashMap(); // shadow index id -> shadow index name(__doris_shadow_xxx) @SerializedName(value = "indexIdToName") - private Map<Long, String> indexIdToName = Maps.newHashMap(); + private Long2ObjectOpenHashMap<String> indexIdToName = new Long2ObjectOpenHashMap<>(); // shadow index id -> index schema @SerializedName(value = "indexSchemaMap") - protected Map<Long, List<Column>> indexSchemaMap = Maps.newHashMap(); + protected Long2ObjectOpenHashMap<List<Column>> indexSchemaMap = new Long2ObjectOpenHashMap<>(); // shadow index id -> (shadow index schema version : schema hash) @SerializedName(value = "indexSchemaVersionAndHashMap") - protected Map<Long, SchemaVersionAndHash> indexSchemaVersionAndHashMap = Maps.newHashMap(); + protected Long2ObjectOpenHashMap<SchemaVersionAndHash> indexSchemaVersionAndHashMap = new Long2ObjectOpenHashMap<>(); // shadow index id -> shadow index short key count @SerializedName(value = "indexShortKeyMap") - protected Map<Long, Short> indexShortKeyMap = Maps.newHashMap(); + protected Long2ObjectOpenHashMap<Short> indexShortKeyMap = new Long2ObjectOpenHashMap<>(); // bloom filter info @SerializedName(value = "hasBfChange") @@ -168,7 +171,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 implements GsonPostProcessable public void addTabletIdMap(long partitionId, long shadowIdxId, long shadowTabletId, long originTabletId) { Map<Long, Long> tabletMap = partitionIndexTabletMap.get(partitionId, shadowIdxId); if (tabletMap == null) { - tabletMap = Maps.newHashMap(); + tabletMap = new Long2LongOpenHashMap(); partitionIndexTabletMap.put(partitionId, shadowIdxId, tabletMap); } tabletMap.put(shadowTabletId, originTabletId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java index b1c84361313..348cab3e226 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java @@ -19,11 +19,12 @@ package org.apache.doris.catalog; import org.apache.doris.persist.gson.GsonPostProcessable; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; + import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -72,7 +73,7 @@ public class MaterializedIndex extends MetaObject implements GsonPostProcessable public MaterializedIndex() { this.state = IndexState.NORMAL; - this.idToTablets = new HashMap<>(); + this.idToTablets = new Long2ObjectOpenHashMap<>(); this.tablets = new ArrayList<>(); } @@ -84,7 +85,7 @@ public class MaterializedIndex extends MetaObject implements GsonPostProcessable this.state = IndexState.NORMAL; } - this.idToTablets = new HashMap<>(); + this.idToTablets = new Long2ObjectOpenHashMap<>(); this.tablets = new ArrayList<>(); this.rowCount = -1; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 4551d3b6049..5f81393c4fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -95,6 +95,8 @@ import org.apache.doris.thrift.TStorageType; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -177,7 +179,7 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc // index id -> index meta @SerializedName(value = "itm", alternate = {"indexIdToMeta"}) - private Map<Long, MaterializedIndexMeta> indexIdToMeta = Maps.newHashMap(); + private Long2ObjectOpenHashMap<MaterializedIndexMeta> indexIdToMeta = new Long2ObjectOpenHashMap<>(); // index name -> index id @SerializedName(value = "inti", alternate = {"indexNameToId"}) private Map<String, Long> indexNameToId = Maps.newHashMap(); @@ -841,7 +843,7 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc // reset all 'indexIdToXXX' map Map<Long, MaterializedIndexMeta> origIdxIdToMeta = indexIdToMeta; - indexIdToMeta = Maps.newHashMap(); + indexIdToMeta = new Long2ObjectOpenHashMap<>(); for (Map.Entry<Long, String> entry : origIdxIdToName.entrySet()) { long newIdxId = env.getNextId(); if (entry.getValue().equals(name)) { @@ -3859,7 +3861,7 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc table.tempPartitions = new TempPartitions(); table.state = state; - table.indexIdToMeta = ImmutableMap.copyOf(indexIdToMeta); + table.indexIdToMeta = new Long2ObjectOpenHashMap<>(indexIdToMeta); table.indexNameToId = ImmutableMap.copyOf(indexNameToId); table.keysType = keysType; table.partitionInfo = partitionInfo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java index d6a3545088c..de0dd9ee5c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java @@ -25,6 +25,8 @@ import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.rpc.RpcException; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; + import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -68,14 +70,14 @@ public class Partition extends MetaObject { * User can do query on them, show them in related 'show' stmt. */ @SerializedName(value = "ivr", alternate = {"idToVisibleRollupIndex"}) - private Map<Long, MaterializedIndex> idToVisibleRollupIndex = Maps.newHashMap(); + private Long2ObjectOpenHashMap<MaterializedIndex> idToVisibleRollupIndex = new Long2ObjectOpenHashMap<>(); /** * Shadow indexes are indexes which are not visible to user. * Query will not run on these shadow indexes, and user can not see them neither. * But load process will load data into these shadow indexes. */ @SerializedName(value = "isi", alternate = {"idToShadowIndex"}) - private Map<Long, MaterializedIndex> idToShadowIndex = Maps.newHashMap(); + private Long2ObjectOpenHashMap<MaterializedIndex> idToShadowIndex = new Long2ObjectOpenHashMap<>(); /** * committed version(hash): after txn is committed, set committed version(hash) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java index 89e6c637a3a..77e4357f679 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java @@ -29,16 +29,16 @@ import org.apache.doris.common.DdlException; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TTabletType; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; + import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -58,30 +58,30 @@ public class PartitionInfo { protected List<Column> partitionColumns = Lists.newArrayList(); // formal partition id -> partition item @SerializedName("IdToItem") - protected Map<Long, PartitionItem> idToItem = Maps.newHashMap(); + protected Long2ObjectOpenHashMap<PartitionItem> idToItem = new Long2ObjectOpenHashMap<>(); @SerializedName("IdToTempItem") // temp partition id -> partition item - protected Map<Long, PartitionItem> idToTempItem = Maps.newHashMap(); + protected Long2ObjectOpenHashMap<PartitionItem> idToTempItem = new Long2ObjectOpenHashMap<>(); // partition id -> data property @SerializedName("IdToDataProperty") - protected Map<Long, DataProperty> idToDataProperty; + protected Long2ObjectOpenHashMap<DataProperty> idToDataProperty; // partition id -> storage policy @SerializedName("IdToStoragePolicy") - protected Map<Long, String> idToStoragePolicy; + protected Long2ObjectOpenHashMap<String> idToStoragePolicy; // partition id -> replication allocation @SerializedName("IdToReplicaAllocation") - protected Map<Long, ReplicaAllocation> idToReplicaAllocation; + protected Long2ObjectOpenHashMap<ReplicaAllocation> idToReplicaAllocation; // true if the partition has multi partition columns @SerializedName("isM") protected boolean isMultiColumnPartition = false; @SerializedName("IdToInMemory") - protected Map<Long, Boolean> idToInMemory; + protected Long2ObjectOpenHashMap<Boolean> idToInMemory; // partition id -> tablet type // Note: currently it's only used for testing, it may change/add more meta field later, // so we defer adding meta serialization until memory engine feature is more complete. - protected Map<Long, TTabletType> idToTabletType; + protected Long2ObjectOpenHashMap<TTabletType> idToTabletType; // the enable automatic partition will hold this, could create partition by expr result @SerializedName("PartitionExprs") @@ -92,21 +92,21 @@ public class PartitionInfo { public PartitionInfo() { this.type = PartitionType.UNPARTITIONED; - this.idToDataProperty = new HashMap<>(); - this.idToReplicaAllocation = new HashMap<>(); - this.idToInMemory = new HashMap<>(); - this.idToTabletType = new HashMap<>(); - this.idToStoragePolicy = new HashMap<>(); + this.idToDataProperty = new Long2ObjectOpenHashMap<>(); + this.idToReplicaAllocation = new Long2ObjectOpenHashMap<>(); + this.idToInMemory = new Long2ObjectOpenHashMap<>(); + this.idToTabletType = new Long2ObjectOpenHashMap<>(); + this.idToStoragePolicy = new Long2ObjectOpenHashMap<>(); this.partitionExprs = new ArrayList<>(); } public PartitionInfo(PartitionType type) { this.type = type; - this.idToDataProperty = new HashMap<>(); - this.idToReplicaAllocation = new HashMap<>(); - this.idToInMemory = new HashMap<>(); - this.idToTabletType = new HashMap<>(); - this.idToStoragePolicy = new HashMap<>(); + this.idToDataProperty = new Long2ObjectOpenHashMap<>(); + this.idToReplicaAllocation = new Long2ObjectOpenHashMap<>(); + this.idToInMemory = new Long2ObjectOpenHashMap<>(); + this.idToTabletType = new Long2ObjectOpenHashMap<>(); + this.idToStoragePolicy = new Long2ObjectOpenHashMap<>(); this.partitionExprs = new ArrayList<>(); } @@ -150,7 +150,7 @@ public class PartitionInfo { * @return both normal partition and temp partition */ public Map<Long, PartitionItem> getAllPartitions() { - HashMap all = new HashMap<>(); + Long2ObjectOpenHashMap<PartitionItem> all = new Long2ObjectOpenHashMap<>(); all.putAll(idToTempItem); all.putAll(idToItem); return all; @@ -420,11 +420,11 @@ public class PartitionInfo { Map<Long, PartitionItem> origIdToItem = idToItem; Map<Long, Boolean> origIdToInMemory = idToInMemory; Map<Long, String> origIdToStoragePolicy = idToStoragePolicy; - idToDataProperty = Maps.newHashMap(); - idToReplicaAllocation = Maps.newHashMap(); - idToItem = Maps.newHashMap(); - idToInMemory = Maps.newHashMap(); - idToStoragePolicy = Maps.newHashMap(); + idToDataProperty = new Long2ObjectOpenHashMap<>(); + idToReplicaAllocation = new Long2ObjectOpenHashMap<>(); + idToItem = new Long2ObjectOpenHashMap<>(); + idToInMemory = new Long2ObjectOpenHashMap<>(); + idToStoragePolicy = new Long2ObjectOpenHashMap<>(); for (Map.Entry<Long, Long> entry : partitionIdMap.entrySet()) { idToDataProperty.put(entry.getKey(), origIdToDataProperty.get(entry.getValue())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 71b99a0a862..0490f618d02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -26,6 +26,8 @@ import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TTablet; import org.apache.doris.thrift.TTabletMetaInfo; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; + import com.google.common.collect.ImmutableMap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Maps; @@ -60,7 +62,7 @@ public abstract class TabletInvertedIndex { private StampedLock lock = new StampedLock(); // tablet id -> tablet meta - protected Map<Long, TabletMeta> tabletMetaMap = Maps.newHashMap(); + protected Long2ObjectOpenHashMap<TabletMeta> tabletMetaMap = new Long2ObjectOpenHashMap<>(); public TabletInvertedIndex() { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java index 4cecf55f06b..655ba1f8ee8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java @@ -19,6 +19,8 @@ package org.apache.doris.catalog; import org.apache.doris.persist.gson.GsonPostProcessable; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; + import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -38,7 +40,7 @@ import java.util.Set; // to make a overwrite load. public class TempPartitions implements GsonPostProcessable { @SerializedName(value = "idToPartition") - private Map<Long, Partition> idToPartition = Maps.newHashMap(); + private Long2ObjectOpenHashMap<Partition> idToPartition = new Long2ObjectOpenHashMap<>(); private Map<String, Partition> nameToPartition = Maps.newHashMap(); @Deprecated // the range info of temp partitions has been moved to "partitionInfo" in OlapTable. diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index a15830efe01..ff5c7192f5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -63,6 +63,8 @@ import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.transaction.TransactionState; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.EvictingQueue; @@ -118,9 +120,9 @@ public class TabletScheduler extends MasterDaemon { * pendingTablets, allTabletTypes, runningTablets and schedHistory are protected by 'synchronized' */ private MinMaxPriorityQueue<TabletSchedCtx> pendingTablets = MinMaxPriorityQueue.create(); - private Map<Long, TabletSchedCtx.Type> allTabletTypes = Maps.newHashMap(); + private Long2ObjectOpenHashMap<TabletSchedCtx.Type> allTabletTypes = new Long2ObjectOpenHashMap<>(); // contains all tabletCtxs which state are RUNNING - private Map<Long, TabletSchedCtx> runningTablets = Maps.newHashMap(); + private Long2ObjectOpenHashMap<TabletSchedCtx> runningTablets = new Long2ObjectOpenHashMap<>(); // save the latest 1000 scheduled tablet info private Queue<TabletSchedCtx> schedHistory = EvictingQueue.create(1000); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletInvertedIndex.java index af6a368f3f9..4cc78ec5d32 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletInvertedIndex.java @@ -20,6 +20,8 @@ package org.apache.doris.cloud.catalog; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.TabletInvertedIndex; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; + import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; @@ -34,7 +36,7 @@ public class CloudTabletInvertedIndex extends TabletInvertedIndex { // tablet id -> replica // for cloud mode, no need to know the replica's backend - private Map<Long, Replica> replicaMetaMap = Maps.newHashMap(); + private Long2ObjectOpenHashMap<Replica> replicaMetaMap = new Long2ObjectOpenHashMap<>(); public CloudTabletInvertedIndex() { super(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 4fe2e9cc61a..6058d5ab5c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -68,6 +68,7 @@ import org.apache.doris.cloud.proto.Cloud.TxnStatusPB; import org.apache.doris.cloud.proto.Cloud.UniqueIdPB; import org.apache.doris.cloud.rpc.MetaServiceProxy; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ConcurrentLong2LongHashMap; import org.apache.doris.common.Config; import org.apache.doris.common.DuplicatedRequestException; import org.apache.doris.common.FeNameFormat; @@ -204,14 +205,14 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { } private TxnStateCallbackFactory callbackFactory; - private final Map<Long, Long> subTxnIdToTxnId = new ConcurrentHashMap<>(); + private final ConcurrentLong2LongHashMap subTxnIdToTxnId = new ConcurrentLong2LongHashMap(); private Map<Long, AtomicInteger> waitToCommitTxnCountMap = new ConcurrentHashMap<>(); private Map<Long, CommitCostTimeStatistic> commitCostTimeStatisticMap = new ConcurrentHashMap<>(); // tableId -> txnId - private Map<Long, Long> lastTxnIdMap = Maps.newConcurrentMap(); + private ConcurrentLong2LongHashMap lastTxnIdMap = new ConcurrentLong2LongHashMap(); // txnId -> signature - private Map<Long, Long> txnLastSignatureMap = Maps.newConcurrentMap(); + private ConcurrentLong2LongHashMap txnLastSignatureMap = new ConcurrentLong2LongHashMap(); private final AutoPartitionCacheManager autoPartitionCacheManager = new AutoPartitionCacheManager(); @@ -2561,13 +2562,11 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { } private void cleanSubTransactions(long transactionId) { - Iterator<Entry<Long, Long>> iterator = subTxnIdToTxnId.entrySet().iterator(); - while (iterator.hasNext()) { - Entry<Long, Long> entry = iterator.next(); - if (entry.getValue() == transactionId) { - iterator.remove(); + subTxnIdToTxnId.forEach((subTxnId, txnId) -> { + if (txnId == transactionId) { + subTxnIdToTxnId.remove(subTxnId); } - } + }); } public Pair<Long, TransactionState> beginSubTxn(long txnId, long dbId, Set<Long> tableIds, String label, diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java index 80b8d0f9d2d..f69c9309dcf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java @@ -20,7 +20,8 @@ package org.apache.doris.cloud.transaction; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TabletMeta; -import com.google.common.collect.Maps; +import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; import java.util.List; import java.util.Map; @@ -28,27 +29,27 @@ import java.util.Set; public class DeleteBitmapUpdateLockContext { private long lockId; - private Map<Long, Long> baseCompactionCnts; - private Map<Long, Long> cumulativeCompactionCnts; - private Map<Long, Long> cumulativePoints; - private Map<Long, Long> tabletStates; - private Map<Long, Set<Long>> tableToPartitions; - private Map<Long, Partition> partitions; - private Map<Long, Map<Long, Set<Long>>> backendToPartitionTablets; - private Map<Long, List<Long>> tableToTabletList; - private Map<Long, TabletMeta> tabletToTabletMeta; + private Long2LongOpenHashMap baseCompactionCnts; + private Long2LongOpenHashMap cumulativeCompactionCnts; + private Long2LongOpenHashMap cumulativePoints; + private Long2LongOpenHashMap tabletStates; + private Long2ObjectOpenHashMap<Set<Long>> tableToPartitions; + private Long2ObjectOpenHashMap<Partition> partitions; + private Long2ObjectOpenHashMap<Map<Long, Set<Long>>> backendToPartitionTablets; + private Long2ObjectOpenHashMap<List<Long>> tableToTabletList; + private Long2ObjectOpenHashMap<TabletMeta> tabletToTabletMeta; public DeleteBitmapUpdateLockContext(long lockId) { this.lockId = lockId; - baseCompactionCnts = Maps.newHashMap(); - cumulativeCompactionCnts = Maps.newHashMap(); - cumulativePoints = Maps.newHashMap(); - tabletStates = Maps.newHashMap(); - tableToPartitions = Maps.newHashMap(); - partitions = Maps.newHashMap(); - backendToPartitionTablets = Maps.newHashMap(); - tableToTabletList = Maps.newHashMap(); - tabletToTabletMeta = Maps.newHashMap(); + baseCompactionCnts = new Long2LongOpenHashMap(); + cumulativeCompactionCnts = new Long2LongOpenHashMap(); + cumulativePoints = new Long2LongOpenHashMap(); + tabletStates = new Long2LongOpenHashMap(); + tableToPartitions = new Long2ObjectOpenHashMap<>(); + partitions = new Long2ObjectOpenHashMap<>(); + backendToPartitionTablets = new Long2ObjectOpenHashMap<>(); + tableToTabletList = new Long2ObjectOpenHashMap<>(); + tabletToTabletMeta = new Long2ObjectOpenHashMap<>(); } public long getLockId() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2LongHashMap.java b/fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2LongHashMap.java new file mode 100644 index 00000000000..0cb8de92dcd --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2LongHashMap.java @@ -0,0 +1,486 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import it.unimi.dsi.fastutil.HashCommon; +import it.unimi.dsi.fastutil.longs.AbstractLong2LongMap; +import it.unimi.dsi.fastutil.longs.Long2LongFunction; +import it.unimi.dsi.fastutil.longs.Long2LongMap; +import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.longs.LongBinaryOperator; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import it.unimi.dsi.fastutil.longs.LongSet; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; +import it.unimi.dsi.fastutil.objects.ObjectSet; + +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.LongUnaryOperator; + +/** + * A concurrent map with primitive long keys and primitive long values, backed by segmented + * {@link Long2LongOpenHashMap} instances with {@link ReentrantReadWriteLock} per segment. + * + * <p>This class saves ~48 bytes per entry compared to {@code ConcurrentHashMap<Long, Long>} + * by avoiding boxing of both keys and values. For fields like partition update row counts + * with millions of entries, this translates to hundreds of MB of heap savings. + * + * <p>The {@link #addTo(long, long)} method provides atomic increment semantics, useful for + * counter patterns. + * + * <p><b>Important:</b> All compound operations from both {@link Long2LongMap} and {@link Map} + * interfaces are overridden to ensure atomicity within a segment's write lock. + */ +public class ConcurrentLong2LongHashMap extends AbstractLong2LongMap { + + private static final int DEFAULT_SEGMENT_COUNT = 16; + private static final int DEFAULT_INITIAL_CAPACITY_PER_SEGMENT = 16; + + private final Segment[] segments; + private final int segmentMask; + private final int segmentBits; + + public ConcurrentLong2LongHashMap() { + this(DEFAULT_SEGMENT_COUNT); + } + + public ConcurrentLong2LongHashMap(int segmentCount) { + if (segmentCount <= 0 || (segmentCount & (segmentCount - 1)) != 0) { + throw new IllegalArgumentException("segmentCount must be a positive power of 2: " + segmentCount); + } + this.segmentBits = Integer.numberOfTrailingZeros(segmentCount); + this.segmentMask = segmentCount - 1; + this.segments = new Segment[segmentCount]; + for (int i = 0; i < segmentCount; i++) { + segments[i] = new Segment(DEFAULT_INITIAL_CAPACITY_PER_SEGMENT); + } + } + + private Segment segmentFor(long key) { + return segments[(int) (HashCommon.mix(key) >>> (64 - segmentBits)) & segmentMask]; + } + + // ---- Read operations (read-lock) ---- + + @Override + public long get(long key) { + Segment seg = segmentFor(key); + seg.lock.readLock().lock(); + try { + return seg.map.get(key); + } finally { + seg.lock.readLock().unlock(); + } + } + + @Override + public long getOrDefault(long key, long defaultValue) { + Segment seg = segmentFor(key); + seg.lock.readLock().lock(); + try { + return seg.map.getOrDefault(key, defaultValue); + } finally { + seg.lock.readLock().unlock(); + } + } + + @Override + public boolean containsKey(long key) { + Segment seg = segmentFor(key); + seg.lock.readLock().lock(); + try { + return seg.map.containsKey(key); + } finally { + seg.lock.readLock().unlock(); + } + } + + @Override + public boolean containsValue(long value) { + for (Segment seg : segments) { + seg.lock.readLock().lock(); + try { + if (seg.map.containsValue(value)) { + return true; + } + } finally { + seg.lock.readLock().unlock(); + } + } + return false; + } + + @Override + public int size() { + long total = 0; + for (Segment seg : segments) { + seg.lock.readLock().lock(); + try { + total += seg.map.size(); + } finally { + seg.lock.readLock().unlock(); + } + } + return (int) Math.min(total, Integer.MAX_VALUE); + } + + @Override + public boolean isEmpty() { + for (Segment seg : segments) { + seg.lock.readLock().lock(); + try { + if (!seg.map.isEmpty()) { + return false; + } + } finally { + seg.lock.readLock().unlock(); + } + } + return true; + } + + // ---- Write operations (write-lock) ---- + + @Override + public long put(long key, long value) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.put(key, value); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public long remove(long key) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.remove(key); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public long putIfAbsent(long key, long value) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.putIfAbsent(key, value); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public boolean replace(long key, long oldValue, long newValue) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.replace(key, oldValue, newValue); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public long replace(long key, long value) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.replace(key, value); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public void clear() { + for (Segment seg : segments) { + seg.lock.writeLock().lock(); + try { + seg.map.clear(); + } finally { + seg.lock.writeLock().unlock(); + } + } + } + + @Override + public void putAll(Map<? extends Long, ? extends Long> m) { + for (Map.Entry<? extends Long, ? extends Long> entry : m.entrySet()) { + put(entry.getKey().longValue(), entry.getValue().longValue()); + } + } + + // ---- Atomic compound operations ---- + // Override ALL compound methods from both Long2LongMap and Map interfaces. + + /** + * Atomically adds the given increment to the value associated with the key. + * If the key is not present, the entry is created with the increment as value + * (starting from defaultReturnValue, which is 0L by default). + * + * @return the new value after the increment + */ + public long addTo(long key, long increment) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + long newValue = seg.map.addTo(key, increment) + increment; + return newValue; + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public long computeIfAbsent(long key, LongUnaryOperator mappingFunction) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + if (seg.map.containsKey(key)) { + return seg.map.get(key); + } + long newValue = mappingFunction.applyAsLong(key); + seg.map.put(key, newValue); + return newValue; + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public long computeIfAbsent(long key, Long2LongFunction mappingFunction) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + if (seg.map.containsKey(key)) { + return seg.map.get(key); + } + long newValue = mappingFunction.get(key); + seg.map.put(key, newValue); + return newValue; + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public Long computeIfAbsent(Long key, Function<? super Long, ? extends Long> mappingFunction) { + long k = key.longValue(); + Segment seg = segmentFor(k); + seg.lock.writeLock().lock(); + try { + if (seg.map.containsKey(k)) { + return seg.map.get(k); + } + Long newValue = mappingFunction.apply(key); + if (newValue != null) { + seg.map.put(k, newValue.longValue()); + } + return newValue; + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public long computeIfPresent(long key, + BiFunction<? super Long, ? super Long, ? extends Long> remappingFunction) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + if (!seg.map.containsKey(key)) { + return defaultReturnValue(); + } + long oldValue = seg.map.get(key); + Long newValue = remappingFunction.apply(key, oldValue); + if (newValue != null) { + seg.map.put(key, newValue.longValue()); + return newValue; + } else { + seg.map.remove(key); + return defaultReturnValue(); + } + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public long compute(long key, BiFunction<? super Long, ? super Long, ? extends Long> remappingFunction) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + Long oldValue = seg.map.containsKey(key) ? seg.map.get(key) : null; + Long newValue = remappingFunction.apply(key, oldValue); + if (newValue != null) { + seg.map.put(key, newValue.longValue()); + return newValue; + } else if (oldValue != null) { + seg.map.remove(key); + } + return defaultReturnValue(); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public long merge(long key, long value, + BiFunction<? super Long, ? super Long, ? extends Long> remappingFunction) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + if (!seg.map.containsKey(key)) { + seg.map.put(key, value); + return value; + } + long oldValue = seg.map.get(key); + Long newValue = remappingFunction.apply(oldValue, value); + if (newValue != null) { + seg.map.put(key, newValue.longValue()); + return newValue; + } else { + seg.map.remove(key); + return defaultReturnValue(); + } + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public long mergeLong(long key, long value, java.util.function.LongBinaryOperator remappingFunction) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + if (!seg.map.containsKey(key)) { + seg.map.put(key, value); + return value; + } + long oldValue = seg.map.get(key); + long newValue = remappingFunction.applyAsLong(oldValue, value); + seg.map.put(key, newValue); + return newValue; + } finally { + seg.lock.writeLock().unlock(); + } + } + + // ---- Iteration (weakly consistent snapshots) ---- + + @Override + public ObjectSet<Long2LongMap.Entry> long2LongEntrySet() { + ObjectOpenHashSet<Long2LongMap.Entry> snapshot = new ObjectOpenHashSet<>(); + for (Segment seg : segments) { + seg.lock.readLock().lock(); + try { + for (Long2LongMap.Entry entry : seg.map.long2LongEntrySet()) { + snapshot.add(new AbstractLong2LongMap.BasicEntry(entry.getLongKey(), entry.getLongValue())); + } + } finally { + seg.lock.readLock().unlock(); + } + } + return snapshot; + } + + @Override + public LongSet keySet() { + LongOpenHashSet snapshot = new LongOpenHashSet(); + for (Segment seg : segments) { + seg.lock.readLock().lock(); + try { + snapshot.addAll(seg.map.keySet()); + } finally { + seg.lock.readLock().unlock(); + } + } + return snapshot; + } + + /** + * Returns the keys as a {@link LongArrayList}. + */ + public LongArrayList keyList() { + LongArrayList list = new LongArrayList(size()); + for (Segment seg : segments) { + seg.lock.readLock().lock(); + try { + list.addAll(seg.map.keySet()); + } finally { + seg.lock.readLock().unlock(); + } + } + return list; + } + + @Override + public it.unimi.dsi.fastutil.longs.LongCollection values() { + LongArrayList snapshot = new LongArrayList(); + for (Segment seg : segments) { + seg.lock.readLock().lock(); + try { + snapshot.addAll(seg.map.values()); + } finally { + seg.lock.readLock().unlock(); + } + } + return snapshot; + } + + /** + * Applies the given action to each entry under read-lock per segment. + */ + public void forEach(LongLongConsumer action) { + for (Segment seg : segments) { + seg.lock.readLock().lock(); + try { + for (Long2LongMap.Entry entry : seg.map.long2LongEntrySet()) { + action.accept(entry.getLongKey(), entry.getLongValue()); + } + } finally { + seg.lock.readLock().unlock(); + } + } + } + + @FunctionalInterface + public interface LongLongConsumer { + void accept(long key, long value); + } + + // ---- Segment inner class ---- + + private static final class Segment { + final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + final Long2LongOpenHashMap map; + + Segment(int initialCapacity) { + this.map = new Long2LongOpenHashMap(initialCapacity); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2ObjectHashMap.java b/fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2ObjectHashMap.java new file mode 100644 index 00000000000..a599d9f712c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2ObjectHashMap.java @@ -0,0 +1,443 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import it.unimi.dsi.fastutil.HashCommon; +import it.unimi.dsi.fastutil.longs.AbstractLong2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectFunction; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import it.unimi.dsi.fastutil.longs.LongSet; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import it.unimi.dsi.fastutil.objects.ObjectCollection; +import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; +import it.unimi.dsi.fastutil.objects.ObjectSet; + +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.LongFunction; + +/** + * A concurrent map with primitive long keys and object values, backed by segmented + * {@link Long2ObjectOpenHashMap} instances with {@link ReentrantReadWriteLock} per segment. + * + * <p>This class provides similar concurrency guarantees to {@link java.util.concurrent.ConcurrentHashMap} + * while avoiding the memory overhead of boxing long keys. For a cluster with millions of tablet entries, + * this saves ~32 bytes per entry compared to {@code ConcurrentHashMap<Long, V>}. + * + * <p>Iteration methods ({@link #long2ObjectEntrySet()}, {@link #keySet()}, {@link #values()}) + * return snapshot copies and are weakly consistent. + * + * <p><b>Important:</b> All compound operations (computeIfAbsent, computeIfPresent, compute, merge) + * from both {@link Long2ObjectMap} and {@link Map} interfaces are overridden to ensure atomicity + * within a segment. The default interface implementations would call get/put as separate locked + * operations, breaking atomicity. + * + * @param <V> the type of mapped values + */ +public class ConcurrentLong2ObjectHashMap<V> extends AbstractLong2ObjectMap<V> { + + private static final int DEFAULT_SEGMENT_COUNT = 16; + private static final int DEFAULT_INITIAL_CAPACITY_PER_SEGMENT = 16; + + private final Segment<V>[] segments; + private final int segmentMask; + private final int segmentBits; + + public ConcurrentLong2ObjectHashMap() { + this(DEFAULT_SEGMENT_COUNT); + } + + @SuppressWarnings("unchecked") + public ConcurrentLong2ObjectHashMap(int segmentCount) { + if (segmentCount <= 0 || (segmentCount & (segmentCount - 1)) != 0) { + throw new IllegalArgumentException("segmentCount must be a positive power of 2: " + segmentCount); + } + this.segmentBits = Integer.numberOfTrailingZeros(segmentCount); + this.segmentMask = segmentCount - 1; + this.segments = new Segment[segmentCount]; + for (int i = 0; i < segmentCount; i++) { + segments[i] = new Segment<>(DEFAULT_INITIAL_CAPACITY_PER_SEGMENT); + } + } + + private Segment<V> segmentFor(long key) { + return segments[(int) (HashCommon.mix(key) >>> (64 - segmentBits)) & segmentMask]; + } + + // ---- Read operations (read-lock) ---- + + @Override + public V get(long key) { + Segment<V> seg = segmentFor(key); + seg.lock.readLock().lock(); + try { + return seg.map.get(key); + } finally { + seg.lock.readLock().unlock(); + } + } + + @Override + public V getOrDefault(long key, V defaultValue) { + Segment<V> seg = segmentFor(key); + seg.lock.readLock().lock(); + try { + return seg.map.getOrDefault(key, defaultValue); + } finally { + seg.lock.readLock().unlock(); + } + } + + @Override + public boolean containsKey(long key) { + Segment<V> seg = segmentFor(key); + seg.lock.readLock().lock(); + try { + return seg.map.containsKey(key); + } finally { + seg.lock.readLock().unlock(); + } + } + + @Override + public boolean containsValue(Object value) { + for (Segment<V> seg : segments) { + seg.lock.readLock().lock(); + try { + if (seg.map.containsValue(value)) { + return true; + } + } finally { + seg.lock.readLock().unlock(); + } + } + return false; + } + + @Override + public int size() { + long total = 0; + for (Segment<V> seg : segments) { + seg.lock.readLock().lock(); + try { + total += seg.map.size(); + } finally { + seg.lock.readLock().unlock(); + } + } + return (int) Math.min(total, Integer.MAX_VALUE); + } + + @Override + public boolean isEmpty() { + for (Segment<V> seg : segments) { + seg.lock.readLock().lock(); + try { + if (!seg.map.isEmpty()) { + return false; + } + } finally { + seg.lock.readLock().unlock(); + } + } + return true; + } + + // ---- Write operations (write-lock) ---- + + @Override + public V put(long key, V value) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.put(key, value); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public V remove(long key) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.remove(key); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public V putIfAbsent(long key, V value) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.putIfAbsent(key, value); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public boolean replace(long key, V oldValue, V newValue) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.replace(key, oldValue, newValue); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public V replace(long key, V value) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.replace(key, value); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public void clear() { + for (Segment<V> seg : segments) { + seg.lock.writeLock().lock(); + try { + seg.map.clear(); + } finally { + seg.lock.writeLock().unlock(); + } + } + } + + @Override + public void putAll(Map<? extends Long, ? extends V> m) { + for (Map.Entry<? extends Long, ? extends V> entry : m.entrySet()) { + put(entry.getKey().longValue(), entry.getValue()); + } + } + + // ---- Atomic compound operations ---- + // Override ALL compound methods from both Long2ObjectMap and Map interfaces + // to ensure the check-then-act is atomic within a segment's write lock. + + @Override + public V computeIfAbsent(long key, LongFunction<? extends V> mappingFunction) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + V val = seg.map.get(key); + if (val != null || seg.map.containsKey(key)) { + return val; + } + V newValue = mappingFunction.apply(key); + seg.map.put(key, newValue); + return newValue; + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public V computeIfAbsent(long key, Long2ObjectFunction<? extends V> mappingFunction) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + V val = seg.map.get(key); + if (val != null || seg.map.containsKey(key)) { + return val; + } + V newValue = mappingFunction.get(key); + seg.map.put(key, newValue); + return newValue; + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public V computeIfAbsent(Long key, Function<? super Long, ? extends V> mappingFunction) { + return computeIfAbsent(key.longValue(), (long k) -> mappingFunction.apply(k)); + } + + @Override + public V computeIfPresent(long key, BiFunction<? super Long, ? super V, ? extends V> remappingFunction) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + V oldValue = seg.map.get(key); + if (oldValue != null || seg.map.containsKey(key)) { + V newValue = remappingFunction.apply(key, oldValue); + if (newValue != null) { + seg.map.put(key, newValue); + } else { + seg.map.remove(key); + } + return newValue; + } + return null; + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public V compute(long key, BiFunction<? super Long, ? super V, ? extends V> remappingFunction) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + V oldValue = seg.map.containsKey(key) ? seg.map.get(key) : null; + V newValue = remappingFunction.apply(key, oldValue); + if (newValue != null) { + seg.map.put(key, newValue); + } else if (seg.map.containsKey(key)) { + seg.map.remove(key); + } + return newValue; + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public V merge(long key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + V oldValue = seg.map.get(key); + V newValue; + if (oldValue != null || seg.map.containsKey(key)) { + newValue = remappingFunction.apply(oldValue, value); + } else { + newValue = value; + } + if (newValue != null) { + seg.map.put(key, newValue); + } else { + seg.map.remove(key); + } + return newValue; + } finally { + seg.lock.writeLock().unlock(); + } + } + + // ---- Iteration (weakly consistent snapshots) ---- + + @Override + public ObjectSet<Long2ObjectMap.Entry<V>> long2ObjectEntrySet() { + ObjectOpenHashSet<Long2ObjectMap.Entry<V>> snapshot = new ObjectOpenHashSet<>(); + for (Segment<V> seg : segments) { + seg.lock.readLock().lock(); + try { + for (Long2ObjectMap.Entry<V> entry : seg.map.long2ObjectEntrySet()) { + snapshot.add(new AbstractLong2ObjectMap.BasicEntry<>(entry.getLongKey(), entry.getValue())); + } + } finally { + seg.lock.readLock().unlock(); + } + } + return snapshot; + } + + @Override + public LongSet keySet() { + LongOpenHashSet snapshot = new LongOpenHashSet(); + for (Segment<V> seg : segments) { + seg.lock.readLock().lock(); + try { + snapshot.addAll(seg.map.keySet()); + } finally { + seg.lock.readLock().unlock(); + } + } + return snapshot; + } + + /** + * Returns the keys as a {@link LongArrayList}. Useful when callers need indexed access + * or will iterate the keys once. Snapshot-based and weakly consistent. + */ + public LongArrayList keyList() { + LongArrayList list = new LongArrayList(size()); + for (Segment<V> seg : segments) { + seg.lock.readLock().lock(); + try { + list.addAll(seg.map.keySet()); + } finally { + seg.lock.readLock().unlock(); + } + } + return list; + } + + @Override + public ObjectCollection<V> values() { + ObjectArrayList<V> snapshot = new ObjectArrayList<>(); + for (Segment<V> seg : segments) { + seg.lock.readLock().lock(); + try { + snapshot.addAll(seg.map.values()); + } finally { + seg.lock.readLock().unlock(); + } + } + return snapshot; + } + + /** + * Applies the given action to each entry under read-lock per segment. + * This is more efficient than iterating {@link #long2ObjectEntrySet()} as it avoids + * creating a snapshot. + */ + public void forEach(LongObjConsumer<? super V> action) { + for (Segment<V> seg : segments) { + seg.lock.readLock().lock(); + try { + for (Long2ObjectMap.Entry<V> entry : seg.map.long2ObjectEntrySet()) { + action.accept(entry.getLongKey(), entry.getValue()); + } + } finally { + seg.lock.readLock().unlock(); + } + } + } + + @FunctionalInterface + public interface LongObjConsumer<V> { + void accept(long key, V value); + } + + // ---- Segment inner class ---- + + private static final class Segment<V> { + final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + final Long2ObjectOpenHashMap<V> map; + + Segment(int initialCapacity) { + this.map = new Long2ObjectOpenHashMap<>(initialCapacity); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java index 99e22aa2a08..1ab8879dde8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java @@ -70,6 +70,8 @@ import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; + import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -133,9 +135,9 @@ public class DeleteJob extends AbstractTxnStateChangeCallback implements DeleteJ this.transactionId = transactionId; this.label = label; this.deleteInfo = deleteInfo; - totalTablets = Sets.newHashSet(); - finishedTablets = Sets.newHashSet(); - quorumTablets = Sets.newHashSet(); + totalTablets = new LongOpenHashSet(); + finishedTablets = new LongOpenHashSet(); + quorumTablets = new LongOpenHashSet(); tabletDeleteInfoMap = Maps.newConcurrentMap(); pushTasks = Sets.newHashSet(); state = DeleteState.UN_QUORUM; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 3b1f498bcfc..1cd341edcbf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.PartitionInfo; import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ConcurrentLong2LongHashMap; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; @@ -87,12 +88,12 @@ public class RoutineLoadManager implements Writable { private Map<Long, RoutineLoadJob> idToRoutineLoadJob = Maps.newConcurrentMap(); private Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newConcurrentMap(); - private ConcurrentHashMap<Long, Long> multiLoadTaskTxnIdToRoutineLoadJobId = new ConcurrentHashMap<>(); + private ConcurrentLong2LongHashMap multiLoadTaskTxnIdToRoutineLoadJobId = new ConcurrentLong2LongHashMap(); private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); // Map<beId, timestamp when added to blacklist> - private Map<Long, Long> blacklist = new ConcurrentHashMap<>(); + private ConcurrentLong2LongHashMap blacklist = new ConcurrentLong2LongHashMap(); private void readLock() { lock.readLock().lock(); @@ -964,10 +965,10 @@ public class RoutineLoadManager implements Writable { } public boolean isInBlacklist(long beId) { - Long timestamp = blacklist.get(beId); - if (timestamp == null) { + if (!blacklist.containsKey(beId)) { return false; } + long timestamp = blacklist.get(beId); if (System.currentTimeMillis() - timestamp > Config.routine_load_blacklist_expire_time_second * 1000) { blacklist.remove(beId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 93d0d10b055..1c1893e7197 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -95,6 +95,8 @@ import org.apache.doris.thrift.TTabletMetaInfo; import org.apache.doris.thrift.TTaskType; import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; + import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.ListMultimap; @@ -263,7 +265,7 @@ public class ReportHandler extends Daemon { } private Map<Long, TTablet> buildTabletMap(List<TTablet> tabletList) { - Map<Long, TTablet> tabletMap = Maps.newHashMap(); + Map<Long, TTablet> tabletMap = new Long2ObjectOpenHashMap<>(); for (TTablet tTablet : tabletList) { if (tTablet.getTabletInfos().isEmpty()) { continue; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 15250aecc8a..c4273a0fdc3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -126,6 +126,8 @@ import org.apache.doris.cloud.catalog.CloudTablet; import org.apache.doris.cloud.datasource.CloudInternalCatalog; import org.apache.doris.cloud.load.CloudBrokerLoadJob; import org.apache.doris.cloud.load.CopyJob; +import org.apache.doris.common.ConcurrentLong2LongHashMap; +import org.apache.doris.common.ConcurrentLong2ObjectHashMap; import org.apache.doris.common.Config; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.RangeUtils; @@ -221,6 +223,12 @@ import org.apache.doris.system.FrontendHbResponse; import org.apache.doris.system.HeartbeatResponse; import org.apache.doris.transaction.TxnCommitAttachment; +import it.unimi.dsi.fastutil.longs.Long2LongMap; +import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; + import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashBasedTable; @@ -651,6 +659,16 @@ public class GsonUtils { .registerTypeAdapterFactory(partitionItemTypeAdapterFactory) .registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()) .registerTypeAdapter(ImmutableList.class, new ImmutableListDeserializer()) + .registerTypeHierarchyAdapter(ConcurrentLong2ObjectHashMap.class, + new ConcurrentLong2ObjectHashMapAdapter()) + .registerTypeAdapter(ConcurrentLong2LongHashMap.class, + new ConcurrentLong2LongHashMapAdapter()) + .registerTypeHierarchyAdapter(Long2ObjectOpenHashMap.class, + new Long2ObjectOpenHashMapAdapter()) + .registerTypeAdapter(Long2LongOpenHashMap.class, + new Long2LongOpenHashMapAdapter()) + .registerTypeAdapter(LongOpenHashSet.class, + new LongOpenHashSetAdapter()) .registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter()) .registerTypeAdapter(PartitionKey.class, new PartitionKey.PartitionKeySerializer()) .registerTypeAdapter(Range.class, new RangeUtils.RangeSerializer()).setExclusionStrategies( @@ -915,6 +933,173 @@ public class GsonUtils { } } + /** + * Gson adapter for {@link ConcurrentLong2ObjectHashMap}. + * Serializes as {@code {"123": value1, "456": value2}} — identical to ConcurrentHashMap<Long, V> + * for backward compatibility during rolling upgrades. + */ + private static class ConcurrentLong2ObjectHashMapAdapter + implements JsonSerializer<ConcurrentLong2ObjectHashMap<?>>, + JsonDeserializer<ConcurrentLong2ObjectHashMap<?>> { + + @Override + public JsonElement serialize(ConcurrentLong2ObjectHashMap<?> src, Type typeOfSrc, + JsonSerializationContext context) { + JsonObject obj = new JsonObject(); + for (Long2ObjectMap.Entry<?> entry : src.long2ObjectEntrySet()) { + obj.add(String.valueOf(entry.getLongKey()), context.serialize(entry.getValue())); + } + return obj; + } + + @Override + public ConcurrentLong2ObjectHashMap<?> deserialize(JsonElement json, Type typeOfT, + JsonDeserializationContext context) throws JsonParseException { + Type valueType = Object.class; + if (typeOfT instanceof ParameterizedType) { + valueType = ((ParameterizedType) typeOfT).getActualTypeArguments()[0]; + } + ConcurrentLong2ObjectHashMap<Object> map = new ConcurrentLong2ObjectHashMap<>(); + JsonObject obj = json.getAsJsonObject(); + for (Map.Entry<String, JsonElement> entry : obj.entrySet()) { + long key = Long.parseLong(entry.getKey()); + Object value = context.deserialize(entry.getValue(), valueType); + map.put(key, value); + } + return map; + } + } + + /** + * Gson adapter for {@link ConcurrentLong2LongHashMap}. + * Serializes as {@code {"123": 456, "789": 0}} — identical to ConcurrentHashMap<Long, Long>. + */ + private static class ConcurrentLong2LongHashMapAdapter + implements JsonSerializer<ConcurrentLong2LongHashMap>, + JsonDeserializer<ConcurrentLong2LongHashMap> { + + @Override + public JsonElement serialize(ConcurrentLong2LongHashMap src, Type typeOfSrc, + JsonSerializationContext context) { + JsonObject obj = new JsonObject(); + for (Long2LongMap.Entry entry : src.long2LongEntrySet()) { + obj.addProperty(String.valueOf(entry.getLongKey()), entry.getLongValue()); + } + return obj; + } + + @Override + public ConcurrentLong2LongHashMap deserialize(JsonElement json, Type typeOfT, + JsonDeserializationContext context) throws JsonParseException { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + JsonObject obj = json.getAsJsonObject(); + for (Map.Entry<String, JsonElement> entry : obj.entrySet()) { + long key = Long.parseLong(entry.getKey()); + long value = entry.getValue().getAsLong(); + map.put(key, value); + } + return map; + } + } + + /** + * Gson adapter for {@link Long2ObjectOpenHashMap}. + * Serializes as {@code {"123": value1, "456": value2}} — identical to HashMap<Long, V>. + */ + private static class Long2ObjectOpenHashMapAdapter + implements JsonSerializer<Long2ObjectOpenHashMap<?>>, + JsonDeserializer<Long2ObjectOpenHashMap<?>> { + + @Override + public JsonElement serialize(Long2ObjectOpenHashMap<?> src, Type typeOfSrc, + JsonSerializationContext context) { + JsonObject obj = new JsonObject(); + for (Long2ObjectMap.Entry<?> entry : src.long2ObjectEntrySet()) { + obj.add(String.valueOf(entry.getLongKey()), context.serialize(entry.getValue())); + } + return obj; + } + + @Override + public Long2ObjectOpenHashMap<?> deserialize(JsonElement json, Type typeOfT, + JsonDeserializationContext context) throws JsonParseException { + Type valueType = Object.class; + if (typeOfT instanceof ParameterizedType) { + valueType = ((ParameterizedType) typeOfT).getActualTypeArguments()[0]; + } + Long2ObjectOpenHashMap<Object> map = new Long2ObjectOpenHashMap<>(); + JsonObject obj = json.getAsJsonObject(); + for (Map.Entry<String, JsonElement> entry : obj.entrySet()) { + long key = Long.parseLong(entry.getKey()); + Object value = context.deserialize(entry.getValue(), valueType); + map.put(key, value); + } + return map; + } + } + + /** + * Gson adapter for {@link Long2LongOpenHashMap}. + * Serializes as {@code {"123": 456, "789": 0}} — identical to HashMap<Long, Long>. + */ + private static class Long2LongOpenHashMapAdapter + implements JsonSerializer<Long2LongOpenHashMap>, + JsonDeserializer<Long2LongOpenHashMap> { + + @Override + public JsonElement serialize(Long2LongOpenHashMap src, Type typeOfSrc, + JsonSerializationContext context) { + JsonObject obj = new JsonObject(); + for (Long2LongMap.Entry entry : src.long2LongEntrySet()) { + obj.addProperty(String.valueOf(entry.getLongKey()), entry.getLongValue()); + } + return obj; + } + + @Override + public Long2LongOpenHashMap deserialize(JsonElement json, Type typeOfT, + JsonDeserializationContext context) throws JsonParseException { + Long2LongOpenHashMap map = new Long2LongOpenHashMap(); + JsonObject obj = json.getAsJsonObject(); + for (Map.Entry<String, JsonElement> entry : obj.entrySet()) { + long key = Long.parseLong(entry.getKey()); + long value = entry.getValue().getAsLong(); + map.put(key, value); + } + return map; + } + } + + /** + * Gson adapter for {@link LongOpenHashSet}. + * Serializes as {@code [1, 2, 3]} — identical to HashSet<Long>. + */ + private static class LongOpenHashSetAdapter + implements JsonSerializer<LongOpenHashSet>, + JsonDeserializer<LongOpenHashSet> { + + @Override + public JsonElement serialize(LongOpenHashSet src, Type typeOfSrc, + JsonSerializationContext context) { + JsonArray arr = new JsonArray(); + for (long v : src) { + arr.add(v); + } + return arr; + } + + @Override + public LongOpenHashSet deserialize(JsonElement json, Type typeOfT, + JsonDeserializationContext context) throws JsonParseException { + JsonArray arr = json.getAsJsonArray(); + LongOpenHashSet set = new LongOpenHashSet(arr.size()); + for (JsonElement elem : arr) { + set.add(elem.getAsLong()); + } + return set; + } + } + public static class PreProcessTypeAdapterFactory implements TypeAdapterFactory { public PreProcessTypeAdapterFactory() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java index 58b2c3e3d1f..6efc596cd88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java @@ -18,6 +18,7 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.ConcurrentLong2LongHashMap; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -39,8 +40,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.StringJoiner; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; public class AnalysisInfo implements Writable { @@ -186,7 +185,7 @@ public class AnalysisInfo implements Writable { @SerializedName("tv") public final long tableVersion; - public final Map<Long, Long> partitionUpdateRows = new ConcurrentHashMap<>(); + public final ConcurrentLong2LongHashMap partitionUpdateRows = new ConcurrentLong2LongHashMap(); @SerializedName("tblUpdateTime") public final long tblUpdateTime; @@ -200,7 +199,7 @@ public class AnalysisInfo implements Writable { @SerializedName("ep") public final boolean enablePartition; - public final ConcurrentMap<Long, Long> indexesRowCount = new ConcurrentHashMap<>(); + public final ConcurrentLong2LongHashMap indexesRowCount = new ConcurrentLong2LongHashMap(); public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId, long dbId, long tblId, Set<Pair<String, String>> jobColumns, Set<String> partitionNames, String colName, Long indexId, diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 228ebd3f51c..b024921e930 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -32,6 +32,7 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.View; import org.apache.doris.catalog.info.PartitionNamesInfo; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ConcurrentLong2LongHashMap; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; @@ -1370,7 +1371,7 @@ public class AnalysisManager implements Writable { Map<Long, Long> tabletToRows = new HashMap<>(originTabletToRows); int tabletCount = tabletToRows.size(); if (tableStats.partitionUpdateRows == null) { - tableStats.partitionUpdateRows = new ConcurrentHashMap<>(); + tableStats.partitionUpdateRows = new ConcurrentLong2LongHashMap(); } for (Partition p : partitions) { MaterializedIndex baseIndex = p.getBaseIndex(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index 71bd7feeea6..3656faace9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; +import org.apache.doris.common.ConcurrentLong2LongHashMap; import org.apache.doris.common.DdlException; import org.apache.doris.common.Status; import org.apache.doris.common.util.DebugUtil; @@ -56,7 +57,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.ConcurrentMap; public abstract class BaseAnalysisTask { @@ -421,9 +421,9 @@ public abstract class BaseAnalysisTask { // Skip partitions that not changed after last analyze. // External table getPartition always return null. So external table doesn't skip any partitions. if (partition != null && tableStatsStatus != null && tableStatsStatus.partitionUpdateRows != null) { - ConcurrentMap<Long, Long> tableUpdateRows = tableStatsStatus.partitionUpdateRows; + ConcurrentLong2LongHashMap tableUpdateRows = tableStatsStatus.partitionUpdateRows; if (columnStatsMeta != null && columnStatsMeta.partitionUpdateRows != null) { - ConcurrentMap<Long, Long> columnUpdateRows = columnStatsMeta.partitionUpdateRows; + ConcurrentLong2LongHashMap columnUpdateRows = columnStatsMeta.partitionUpdateRows; long id = partition.getId(); if (Objects.equals(tableUpdateRows.getOrDefault(id, 0L), columnUpdateRows.get(id))) { LOG.debug("Partition {} doesn't change after last analyze for column {}, skip it.", diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java index 78f51c2ac0c..ef628f05412 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java @@ -17,6 +17,7 @@ package org.apache.doris.statistics; +import org.apache.doris.common.ConcurrentLong2LongHashMap; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.AnalysisType; import org.apache.doris.statistics.AnalysisInfo.JobType; @@ -24,8 +25,6 @@ import org.apache.doris.statistics.AnalysisInfo.JobType; import com.google.gson.annotations.SerializedName; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; public class ColStatsMeta { @@ -56,7 +55,7 @@ public class ColStatsMeta { public long tableVersion; @SerializedName("pur") - public ConcurrentMap<Long, Long> partitionUpdateRows = new ConcurrentHashMap<>(); + public ConcurrentLong2LongHashMap partitionUpdateRows = new ConcurrentLong2LongHashMap(); public ColStatsMeta(long updatedTime, AnalysisMethod analysisMethod, AnalysisType analysisType, JobType jobType, long queriedTimes, long rowCount, long updatedRows, diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java index 0b77aba91e3..9e9f0e48b33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java @@ -20,6 +20,7 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.ConcurrentLong2LongHashMap; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -34,7 +35,6 @@ import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -100,10 +100,10 @@ public class TableStatsMeta implements Writable, GsonPostProcessable { public boolean userInjected; @SerializedName("pur") - public ConcurrentMap<Long, Long> partitionUpdateRows = new ConcurrentHashMap<>(); + public ConcurrentLong2LongHashMap partitionUpdateRows = new ConcurrentLong2LongHashMap(); @SerializedName("irc") - private ConcurrentMap<Long, Long> indexesRowCount = new ConcurrentHashMap<>(); + private ConcurrentLong2LongHashMap indexesRowCount = new ConcurrentLong2LongHashMap(); @VisibleForTesting public TableStatsMeta() { @@ -184,7 +184,7 @@ public class TableStatsMeta implements Writable, GsonPostProcessable { colStatsMeta.tableVersion = analyzedJob.tableVersion; if (analyzedJob.enablePartition) { if (colStatsMeta.partitionUpdateRows == null) { - colStatsMeta.partitionUpdateRows = new ConcurrentHashMap<>(); + colStatsMeta.partitionUpdateRows = new ConcurrentLong2LongHashMap(); } colStatsMeta.partitionUpdateRows.putAll(analyzedJob.partitionUpdateRows); } @@ -224,10 +224,10 @@ public class TableStatsMeta implements Writable, GsonPostProcessable { @Override public void gsonPostProcess() throws IOException { if (partitionUpdateRows == null) { - partitionUpdateRows = new ConcurrentHashMap<>(); + partitionUpdateRows = new ConcurrentLong2LongHashMap(); } if (indexesRowCount == null) { - indexesRowCount = new ConcurrentHashMap<>(); + indexesRowCount = new ConcurrentLong2LongHashMap(); } if (colToColStatsMeta == null) { colToColStatsMeta = new ConcurrentHashMap<>(); @@ -239,12 +239,10 @@ public class TableStatsMeta implements Writable, GsonPostProcessable { } protected void clearStaleIndexRowCount(OlapTable table) { - Iterator<Long> iterator = indexesRowCount.keySet().iterator(); List<Long> indexIds = table.getIndexIdList(); - while (iterator.hasNext()) { - long key = iterator.next(); + for (long key : indexesRowCount.keySet()) { if (!indexIds.contains(key)) { - iterator.remove(); + indexesRowCount.remove(key); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index c8a52de9e35..d69425010ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -46,6 +46,7 @@ import org.apache.doris.catalog.Type; import org.apache.doris.cloud.qe.ComputeGroupException; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ConcurrentLong2LongHashMap; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; @@ -119,7 +120,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.StringJoiner; -import java.util.concurrent.ConcurrentMap; + import java.util.function.Function; import java.util.stream.Collectors; @@ -1185,7 +1186,7 @@ public class StatisticsUtil { if (tableStatsStatus.partitionChanged != null && tableStatsStatus.partitionChanged.get()) { return true; } - ConcurrentMap<Long, Long> partitionUpdateRows = columnStatsMeta.partitionUpdateRows; + ConcurrentLong2LongHashMap partitionUpdateRows = columnStatsMeta.partitionUpdateRows; if (partitionUpdateRows == null) { return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java index 2a369d0cf4c..d0b686b6724 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java @@ -21,7 +21,8 @@ import org.apache.doris.thrift.TPartitionVersionInfo; import org.apache.doris.thrift.TPublishVersionRequest; import org.apache.doris.thrift.TTaskType; -import com.google.common.collect.Maps; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; + import com.google.common.collect.Sets; import lombok.Getter; import org.apache.logging.log4j.LogManager; @@ -52,7 +53,7 @@ public class PublishVersionTask extends AgentTask { /** * To collect loaded rows for each tablet from each BE */ - private final Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows = Maps.newHashMap(); + private final Long2ObjectOpenHashMap<Map<Long, Long>> tableIdToTabletDeltaRows = new Long2ObjectOpenHashMap<>(); public PublishVersionTask(long backendId, long transactionId, long dbId, List<TPartitionVersionInfo> partitionVersionInfos, long createTime) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index b0a17790170..594b7bfea79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -35,6 +35,7 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ConcurrentLong2LongHashMap; import org.apache.doris.common.Config; import org.apache.doris.common.DuplicatedRequestException; import org.apache.doris.common.FeConstants; @@ -66,6 +67,8 @@ import org.apache.doris.task.PublishVersionTask; import org.apache.doris.thrift.TTabletCommitInfo; import org.apache.doris.thrift.TUniqueId; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -129,7 +132,7 @@ public class DatabaseTransactionMgr { // transactionId -> final status TransactionState private final Map<Long, TransactionState> idToFinalStatusTransactionState = Maps.newConcurrentMap(); - private final Map<Long, Long> subTxnIdToTxnId = new ConcurrentHashMap<>(); + private final ConcurrentLong2LongHashMap subTxnIdToTxnId = new ConcurrentLong2LongHashMap(); // The following 2 queues are to store transactionStates with final status // These queues are mainly used to avoid traversing all txns and speed up the cleaning time @@ -150,7 +153,7 @@ public class DatabaseTransactionMgr { // it must exists in dbIdToTxnLabels, and vice versa private final Map<String, Set<Long>> labelToTxnIds = Maps.newHashMap(); - private final Map<Long, Long> tableCommittedTxnCount = Maps.newConcurrentMap(); + private final ConcurrentLong2LongHashMap tableCommittedTxnCount = new ConcurrentLong2LongHashMap(); private Long lastCommittedTxnCountUpdateTime = 0L; @@ -455,7 +458,7 @@ public class DatabaseTransactionMgr { return; } - Set<Long> errorReplicaIds = Sets.newHashSet(); + Set<Long> errorReplicaIds = new LongOpenHashSet(); Set<Long> totalInvolvedBackends = Sets.newHashSet(); Map<Long, Set<Long>> tableToPartition = new HashMap<>(); @@ -808,7 +811,7 @@ public class DatabaseTransactionMgr { return; } - Set<Long> errorReplicaIds = Sets.newHashSet(); + Set<Long> errorReplicaIds = new LongOpenHashSet(); Set<Long> totalInvolvedBackends = Sets.newHashSet(); Map<Long, Set<Long>> tableToPartition = new HashMap<>(); if (!is2PC) { @@ -873,7 +876,7 @@ public class DatabaseTransactionMgr { } // error replica may be duplicated for different sub transaction, but it's ok - Set<Long> errorReplicaIds = Sets.newHashSet(); + Set<Long> errorReplicaIds = new LongOpenHashSet(); Map<Long, Set<Long>> subTxnToPartition = new HashMap<>(); Set<Long> totalInvolvedBackends = Sets.newHashSet(); for (SubTransactionState subTransactionState : subTransactionStates) { @@ -3042,12 +3045,10 @@ public class DatabaseTransactionMgr { } private void cleanSubTransactions(long transactionId) { - Iterator<Entry<Long, Long>> iterator = subTxnIdToTxnId.entrySet().iterator(); - while (iterator.hasNext()) { - Entry<Long, Long> entry = iterator.next(); - if (entry.getValue() == transactionId) { - iterator.remove(); + subTxnIdToTxnId.forEach((subTxnId, txnId) -> { + if (txnId == transactionId) { + subTxnIdToTxnId.remove(subTxnId); } - } + }); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index fcbd6705ac2..736ceff254d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -39,7 +39,9 @@ import org.apache.doris.task.UpdateVisibleVersionTask; import org.apache.doris.thrift.TPartitionVersionInfo; import org.apache.doris.thrift.TTaskType; -import com.google.common.collect.Maps; +import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; + import com.google.common.collect.Sets; import org.apache.commons.collections4.CollectionUtils; import org.apache.logging.log4j.LogManager; @@ -66,8 +68,8 @@ public class PublishVersionDaemon extends MasterDaemon { private Set<Long> publishingTxnIds = Sets.newConcurrentHashSet(); private final MonitoredReentrantReadWriteLock visibleVersionsLock = new MonitoredReentrantReadWriteLock(true); - private Map<Long, Long> partitionVisibleVersions = Maps.newHashMap(); - private Map<Long, Set<Long>> backendPartitions = Maps.newHashMap(); + private Long2LongOpenHashMap partitionVisibleVersions = new Long2LongOpenHashMap(); + private Long2ObjectOpenHashMap<Set<Long>> backendPartitions = new Long2ObjectOpenHashMap<>(); public PublishVersionDaemon() { super("PUBLISH_VERSION", Config.publish_version_interval_ms); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java index 412ae065f3e..d0becfe74ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java @@ -19,7 +19,8 @@ package org.apache.doris.transaction; import org.apache.doris.thrift.TPartitionVersionInfo; -import com.google.common.collect.Maps; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; + import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,7 +35,7 @@ public class TableCommitInfo { @SerializedName(value = "tableId") private long tableId; @SerializedName(value = "idToPartitionCommitInfo") - private Map<Long, PartitionCommitInfo> idToPartitionCommitInfo; + private Long2ObjectOpenHashMap<PartitionCommitInfo> idToPartitionCommitInfo; @SerializedName(value = "version") private long version; @SerializedName(value = "versionTime") @@ -46,7 +47,7 @@ public class TableCommitInfo { public TableCommitInfo(long tableId) { this.tableId = tableId; - idToPartitionCommitInfo = Maps.newHashMap(); + idToPartitionCommitInfo = new Long2ObjectOpenHashMap<>(); } public long getTableId() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 16a4761e340..7a9ce7e5816 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -30,6 +30,9 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.task.PublishVersionTask; import org.apache.doris.thrift.TUniqueId; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -48,7 +51,6 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -210,7 +212,7 @@ public class TransactionState implements Writable { // no need to persist it. private TUniqueId requestId; @SerializedName(value = "idToTableCommitInfos") - private Map<Long, TableCommitInfo> idToTableCommitInfos; + private Long2ObjectOpenHashMap<TableCommitInfo> idToTableCommitInfos; // coordinator is show who begin this txn (FE, or one of BE, etc...) @SerializedName(value = "txnCoordinator") private TxnCoordinator txnCoordinator; @@ -230,12 +232,12 @@ public class TransactionState implements Writable { private String reason = ""; // error replica ids @SerializedName(value = "errorReplicas") - private Set<Long> errorReplicas; + private LongOpenHashSet errorReplicas; // this latch will be counted down when txn status change to VISIBLE private CountDownLatch visibleLatch; // this state need not be serialized. the map key is backend_id - private Map<Long, List<PublishVersionTask>> publishVersionTasks; + private Long2ObjectOpenHashMap<List<PublishVersionTask>> publishVersionTasks; private boolean hasSendTask; private TransactionStatus preStatus = null; @@ -280,13 +282,13 @@ public class TransactionState implements Writable { // which tables and rollups it loaded. // tbl id -> (index ids) @SerializedName(value = "loadedTblIndexes") - private Map<Long, Set<Long>> loadedTblIndexes = Maps.newHashMap(); + private Long2ObjectOpenHashMap<Set<Long>> loadedTblIndexes = new Long2ObjectOpenHashMap<>(); /** * the value is the num delta rows of all replicas in each tablet */ @SerializedName(value = "deltaRows") - private final Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows = Maps.newHashMap(); + private final Long2ObjectOpenHashMap<Map<Long, Long>> tableIdToTabletDeltaRows = new Long2ObjectOpenHashMap<>(); private String errorLogUrl = null; @@ -311,7 +313,7 @@ public class TransactionState implements Writable { private boolean isPartialUpdate = false; // table id -> schema info - private Map<Long, SchemaInfo> txnSchemas = new HashMap<>(); + private Long2ObjectOpenHashMap<SchemaInfo> txnSchemas = new Long2ObjectOpenHashMap<>(); @Getter @SerializedName(value = "sti") @@ -321,14 +323,14 @@ public class TransactionState implements Writable { private Map<Long, TableCommitInfo> subTxnIdToTableCommitInfo = new TreeMap<>(); @Getter @Setter - private Set<Long> involvedBackends = Sets.newHashSet(); + private Set<Long> involvedBackends = new LongOpenHashSet(); public TransactionState() { this.dbId = -1; this.tableIdList = Lists.newArrayList(); this.transactionId = -1; this.label = ""; - this.idToTableCommitInfos = Maps.newHashMap(); + this.idToTableCommitInfos = new Long2ObjectOpenHashMap<>(); // mocked, to avoid NPE this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, 0, "127.0.0.1", System.currentTimeMillis()); this.transactionStatus = TransactionStatus.PREPARE; @@ -338,8 +340,8 @@ public class TransactionState implements Writable { this.commitTime = -1; this.finishTime = -1; this.reason = ""; - this.errorReplicas = Sets.newHashSet(); - this.publishVersionTasks = Maps.newHashMap(); + this.errorReplicas = new LongOpenHashSet(); + this.publishVersionTasks = new Long2ObjectOpenHashMap<>(); this.hasSendTask = false; this.visibleLatch = new CountDownLatch(1); } @@ -351,7 +353,7 @@ public class TransactionState implements Writable { this.transactionId = transactionId; this.label = label; this.requestId = requestId; - this.idToTableCommitInfos = Maps.newHashMap(); + this.idToTableCommitInfos = new Long2ObjectOpenHashMap<>(); this.txnCoordinator = txnCoordinator; this.transactionStatus = TransactionStatus.PREPARE; this.sourceType = sourceType; @@ -360,8 +362,8 @@ public class TransactionState implements Writable { this.commitTime = -1; this.finishTime = -1; this.reason = ""; - this.errorReplicas = Sets.newHashSet(); - this.publishVersionTasks = Maps.newHashMap(); + this.errorReplicas = new LongOpenHashSet(); + this.publishVersionTasks = new Long2ObjectOpenHashMap<>(); this.hasSendTask = false; this.visibleLatch = new CountDownLatch(1); this.callbackId = callbackId; @@ -389,7 +391,11 @@ public class TransactionState implements Writable { } public void setErrorReplicas(Set<Long> newErrorReplicas) { - this.errorReplicas = newErrorReplicas; + if (newErrorReplicas instanceof LongOpenHashSet) { + this.errorReplicas = (LongOpenHashSet) newErrorReplicas; + } else { + this.errorReplicas = new LongOpenHashSet(newErrorReplicas); + } } public void addPublishVersionTask(Long backendId, PublishVersionTask task) { @@ -621,7 +627,7 @@ public class TransactionState implements Writable { this.reason = Strings.nullToEmpty(reason); } - public Set<Long> getErrorReplicas() { + public LongOpenHashSet getErrorReplicas() { return this.errorReplicas; } @@ -690,7 +696,7 @@ public class TransactionState implements Writable { } public synchronized void addTableIndexes(OlapTable table) { - Set<Long> indexIds = loadedTblIndexes.computeIfAbsent(table.getId(), k -> Sets.newHashSet()); + Set<Long> indexIds = loadedTblIndexes.computeIfAbsent(table.getId(), k -> new LongOpenHashSet()); // always equal the index ids indexIds.clear(); indexIds.addAll(table.getIndexIdToMeta().keySet()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2LongHashMapTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2LongHashMapTest.java new file mode 100644 index 00000000000..4bc749d1501 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2LongHashMapTest.java @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import com.google.gson.Gson; +import it.unimi.dsi.fastutil.longs.Long2LongMap; +import it.unimi.dsi.fastutil.longs.LongSet; +import it.unimi.dsi.fastutil.objects.ObjectSet; +import org.apache.doris.persist.gson.GsonUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +class ConcurrentLong2LongHashMapTest { + + @Test + void testPutAndGet() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 100L); + Assertions.assertEquals(100L, map.get(1L)); + map.put(1L, 200L); + Assertions.assertEquals(200L, map.get(1L)); + } + + @Test + void testGetMissingKeyReturnsDefaultReturnValue() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + // Default return value is 0L + Assertions.assertEquals(0L, map.get(999L)); + } + + @Test + void testGetOrDefault() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 100L); + Assertions.assertEquals(100L, map.getOrDefault(1L, -1L)); + Assertions.assertEquals(-1L, map.getOrDefault(2L, -1L)); + } + + @Test + void testRemove() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 100L); + Assertions.assertEquals(100L, map.remove(1L)); + Assertions.assertFalse(map.containsKey(1L)); + // Remove non-existent key returns defaultReturnValue + Assertions.assertEquals(0L, map.remove(1L)); + } + + @Test + void testContainsKey() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + Assertions.assertFalse(map.containsKey(1L)); + map.put(1L, 0L); + Assertions.assertTrue(map.containsKey(1L)); + } + + @Test + void testContainsValue() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 100L); + map.put(2L, 200L); + Assertions.assertTrue(map.containsValue(100L)); + Assertions.assertFalse(map.containsValue(300L)); + } + + @Test + void testSizeAndIsEmpty() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + Assertions.assertTrue(map.isEmpty()); + Assertions.assertEquals(0, map.size()); + map.put(1L, 100L); + map.put(2L, 200L); + Assertions.assertFalse(map.isEmpty()); + Assertions.assertEquals(2, map.size()); + } + + @Test + void testClear() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 100L); + map.put(2L, 200L); + map.clear(); + Assertions.assertTrue(map.isEmpty()); + Assertions.assertEquals(0, map.size()); + } + + @Test + void testPutAll() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + Map<Long, Long> source = new HashMap<>(); + source.put(1L, 100L); + source.put(2L, 200L); + source.put(3L, 300L); + map.putAll(source); + Assertions.assertEquals(3, map.size()); + Assertions.assertEquals(200L, map.get(2L)); + } + + @Test + void testPutIfAbsent() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + Assertions.assertEquals(0L, map.putIfAbsent(1L, 100L)); + Assertions.assertEquals(100L, map.putIfAbsent(1L, 200L)); + Assertions.assertEquals(100L, map.get(1L)); + } + + @Test + void testComputeIfAbsent() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + long val = map.computeIfAbsent(1L, k -> k * 10); + Assertions.assertEquals(10L, val); + // Should not recompute + long val2 = map.computeIfAbsent(1L, k -> k * 20); + Assertions.assertEquals(10L, val2); + } + + // ---- addTo tests ---- + + @Test + void testAddToNewKey() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + long result = map.addTo(1L, 5L); + Assertions.assertEquals(5L, result); + Assertions.assertEquals(5L, map.get(1L)); + } + + @Test + void testAddToExistingKey() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 10L); + long result = map.addTo(1L, 5L); + Assertions.assertEquals(15L, result); + Assertions.assertEquals(15L, map.get(1L)); + } + + @Test + void testAddToNegative() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 10L); + long result = map.addTo(1L, -3L); + Assertions.assertEquals(7L, result); + } + + // ---- Iteration tests ---- + + @Test + void testEntrySet() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 100L); + map.put(2L, 200L); + + ObjectSet<Long2LongMap.Entry> entries = map.long2LongEntrySet(); + Assertions.assertEquals(2, entries.size()); + + Set<Long> keys = new HashSet<>(); + for (Long2LongMap.Entry entry : entries) { + keys.add(entry.getLongKey()); + } + Assertions.assertTrue(keys.contains(1L)); + Assertions.assertTrue(keys.contains(2L)); + } + + @Test + void testKeySet() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(10L, 100L); + map.put(20L, 200L); + LongSet keys = map.keySet(); + Assertions.assertEquals(2, keys.size()); + Assertions.assertTrue(keys.contains(10L)); + Assertions.assertTrue(keys.contains(20L)); + } + + @Test + void testValues() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 100L); + map.put(2L, 200L); + it.unimi.dsi.fastutil.longs.LongCollection values = map.values(); + Assertions.assertEquals(2, values.size()); + Assertions.assertTrue(values.contains(100L)); + Assertions.assertTrue(values.contains(200L)); + } + + @Test + void testForEach() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 100L); + map.put(2L, 200L); + Map<Long, Long> collected = new HashMap<>(); + map.forEach(collected::put); + Assertions.assertEquals(2, collected.size()); + Assertions.assertEquals(100L, (long) collected.get(1L)); + } + + // ---- Large map test ---- + + @Test + void testLargeMap() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + int count = 100_000; + for (long i = 0; i < count; i++) { + map.put(i, i * 3); + } + Assertions.assertEquals(count, map.size()); + for (long i = 0; i < count; i++) { + Assertions.assertEquals(i * 3, map.get(i)); + } + } + + @Test + void testCustomSegmentCount() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(4); + for (long i = 0; i < 1000; i++) { + map.put(i, i); + } + Assertions.assertEquals(1000, map.size()); + } + + @Test + void testInvalidSegmentCount() { + Assertions.assertThrows(IllegalArgumentException.class, () -> new ConcurrentLong2LongHashMap(3)); + Assertions.assertThrows(IllegalArgumentException.class, () -> new ConcurrentLong2LongHashMap(0)); + } + + // ---- Concurrency tests ---- + + @Test + void testConcurrentPuts() throws Exception { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + int threads = 8; + int keysPerThread = 10_000; + ExecutorService executor = Executors.newFixedThreadPool(threads); + CountDownLatch latch = new CountDownLatch(threads); + + for (int t = 0; t < threads; t++) { + final int threadId = t; + executor.submit(() -> { + for (int i = 0; i < keysPerThread; i++) { + long key = (long) threadId * keysPerThread + i; + map.put(key, key * 2); + } + latch.countDown(); + }); + } + latch.await(); + executor.shutdown(); + Assertions.assertEquals(threads * keysPerThread, map.size()); + } + + @Test + void testConcurrentAddTo() throws Exception { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + int threads = 16; + int incrementsPerThread = 10_000; + long key = 42L; + ExecutorService executor = Executors.newFixedThreadPool(threads); + CountDownLatch latch = new CountDownLatch(threads); + + for (int t = 0; t < threads; t++) { + executor.submit(() -> { + for (int i = 0; i < incrementsPerThread; i++) { + map.addTo(key, 1L); + } + latch.countDown(); + }); + } + latch.await(); + executor.shutdown(); + + Assertions.assertEquals((long) threads * incrementsPerThread, map.get(key)); + } + + @Test + void testConcurrentReadWrite() throws Exception { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + for (long i = 0; i < 1000; i++) { + map.put(i, i); + } + + int threads = 8; + ExecutorService executor = Executors.newFixedThreadPool(threads); + CountDownLatch latch = new CountDownLatch(threads); + AtomicInteger errors = new AtomicInteger(); + + for (int t = 0; t < threads; t++) { + final int threadId = t; + executor.submit(() -> { + try { + for (int i = 0; i < 5000; i++) { + long key = i % 1000; + if (threadId % 2 == 0) { + map.get(key); + map.containsKey(key); + } else { + map.put(key + 1000L * threadId, (long) i); + } + } + } catch (Exception e) { + errors.incrementAndGet(); + } finally { + latch.countDown(); + } + }); + } + latch.await(); + executor.shutdown(); + Assertions.assertEquals(0, errors.get()); + } + + @Test + void testConcurrentComputeIfAbsent() throws Exception { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + int threads = 16; + long sharedKey = 42L; + AtomicInteger computeCount = new AtomicInteger(); + ExecutorService executor = Executors.newFixedThreadPool(threads); + List<Future<Long>> futures = new ArrayList<>(); + + for (int t = 0; t < threads; t++) { + futures.add(executor.submit(() -> + map.computeIfAbsent(sharedKey, k -> { + computeCount.incrementAndGet(); + return k * 10; + }) + )); + } + Set<Long> results = new HashSet<>(); + for (Future<Long> f : futures) { + results.add(f.get()); + } + executor.shutdown(); + + Assertions.assertEquals(1, results.size()); + Assertions.assertTrue(results.contains(420L)); + Assertions.assertEquals(1, computeCount.get()); + } + + @Test + void testConcurrentIterationDuringModification() throws Exception { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + for (long i = 0; i < 1000; i++) { + map.put(i, i); + } + + int threads = 4; + ExecutorService executor = Executors.newFixedThreadPool(threads); + CountDownLatch latch = new CountDownLatch(threads); + AtomicInteger errors = new AtomicInteger(); + + for (int t = 0; t < threads; t++) { + final int threadId = t; + executor.submit(() -> { + try { + for (int i = 0; i < 100; i++) { + if (threadId % 2 == 0) { + map.keySet(); + map.values(); + map.long2LongEntrySet(); + } else { + map.put(1000L + threadId * 100 + i, (long) i); + map.remove((long) (i % 500)); + } + } + } catch (Exception e) { + errors.incrementAndGet(); + } finally { + latch.countDown(); + } + }); + } + latch.await(); + executor.shutdown(); + Assertions.assertEquals(0, errors.get()); + } + + // ---- Gson serialization tests ---- + + @Test + void testGsonRoundTrip() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(100L, 1000L); + map.put(200L, 2000L); + + String json = GsonUtils.GSON.toJson(map); + + ConcurrentLong2LongHashMap deserialized = GsonUtils.GSON.fromJson(json, ConcurrentLong2LongHashMap.class); + + Assertions.assertEquals(2, deserialized.size()); + Assertions.assertEquals(1000L, deserialized.get(100L)); + Assertions.assertEquals(2000L, deserialized.get(200L)); + } + + @Test + void testGsonFormatCompatibleWithConcurrentHashMap() { + ConcurrentHashMap<Long, Long> chm = new ConcurrentHashMap<>(); + chm.put(1L, 100L); + chm.put(2L, 200L); + String chmJson = new Gson().toJson(chm); + + ConcurrentLong2LongHashMap fastMap = new ConcurrentLong2LongHashMap(); + fastMap.put(1L, 100L); + fastMap.put(2L, 200L); + String fastJson = GsonUtils.GSON.toJson(fastMap); + + Gson gson = new Gson(); + Map<?, ?> chmParsed = gson.fromJson(chmJson, Map.class); + Map<?, ?> fastParsed = gson.fromJson(fastJson, Map.class); + Assertions.assertEquals(chmParsed, fastParsed); + } + + @Test + void testDefaultReturnValueBehavior() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + // Primitive get returns 0L (defaultReturnValue) for missing keys + Assertions.assertEquals(0L, map.get(999L)); + + // Store 0L explicitly + map.put(1L, 0L); + Assertions.assertTrue(map.containsKey(1L)); + Assertions.assertEquals(0L, map.get(1L)); + + // Boxed get via Map<Long,Long> interface returns null for missing keys + Long boxedResult = map.getOrDefault(999L, map.defaultReturnValue()); + Assertions.assertEquals(0L, boxedResult); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2ObjectHashMapTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2ObjectHashMapTest.java new file mode 100644 index 00000000000..88f1a45e7e3 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2ObjectHashMapTest.java @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.LongSet; +import it.unimi.dsi.fastutil.objects.ObjectCollection; +import it.unimi.dsi.fastutil.objects.ObjectSet; +import org.apache.doris.persist.gson.GsonUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +class ConcurrentLong2ObjectHashMapTest { + + @Test + void testPutAndGet() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + Assertions.assertNull(map.put(1L, "one")); + Assertions.assertEquals("one", map.get(1L)); + Assertions.assertEquals("one", map.put(1L, "ONE")); + Assertions.assertEquals("ONE", map.get(1L)); + } + + @Test + void testGetMissingKey() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + Assertions.assertNull(map.get(999L)); + } + + @Test + void testGetOrDefault() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(1L, "one"); + Assertions.assertEquals("one", map.getOrDefault(1L, "default")); + Assertions.assertEquals("default", map.getOrDefault(2L, "default")); + } + + @Test + void testRemove() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(1L, "one"); + Assertions.assertEquals("one", map.remove(1L)); + Assertions.assertNull(map.get(1L)); + Assertions.assertNull(map.remove(1L)); + } + + @Test + void testContainsKey() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + Assertions.assertFalse(map.containsKey(1L)); + map.put(1L, "one"); + Assertions.assertTrue(map.containsKey(1L)); + } + + @Test + void testContainsValue() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(1L, "one"); + map.put(2L, "two"); + Assertions.assertTrue(map.containsValue("one")); + Assertions.assertFalse(map.containsValue("three")); + } + + @Test + void testSizeAndIsEmpty() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + Assertions.assertTrue(map.isEmpty()); + Assertions.assertEquals(0, map.size()); + map.put(1L, "one"); + map.put(2L, "two"); + Assertions.assertFalse(map.isEmpty()); + Assertions.assertEquals(2, map.size()); + } + + @Test + void testClear() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(1L, "one"); + map.put(2L, "two"); + map.clear(); + Assertions.assertTrue(map.isEmpty()); + Assertions.assertEquals(0, map.size()); + } + + @Test + void testPutAll() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + Map<Long, String> source = new HashMap<>(); + source.put(1L, "one"); + source.put(2L, "two"); + source.put(3L, "three"); + map.putAll(source); + Assertions.assertEquals(3, map.size()); + Assertions.assertEquals("two", map.get(2L)); + } + + @Test + void testPutIfAbsent() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + Assertions.assertNull(map.putIfAbsent(1L, "one")); + Assertions.assertEquals("one", map.putIfAbsent(1L, "ONE")); + Assertions.assertEquals("one", map.get(1L)); + } + + @Test + void testComputeIfAbsent() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + String val = map.computeIfAbsent(1L, k -> "computed-" + k); + Assertions.assertEquals("computed-1", val); + // Should not recompute + String val2 = map.computeIfAbsent(1L, k -> "recomputed-" + k); + Assertions.assertEquals("computed-1", val2); + } + + @Test + void testComputeIfPresent() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + // Not present — should return null + Assertions.assertNull(map.computeIfPresent(1L, (k, v) -> v + "-updated")); + + map.put(1L, "one"); + String val = map.computeIfPresent(1L, (k, v) -> v + "-updated"); + Assertions.assertEquals("one-updated", val); + Assertions.assertEquals("one-updated", map.get(1L)); + + // Return null to remove + Assertions.assertNull(map.computeIfPresent(1L, (k, v) -> null)); + Assertions.assertFalse(map.containsKey(1L)); + } + + @Test + void testEntrySet() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(1L, "one"); + map.put(2L, "two"); + + ObjectSet<Long2ObjectMap.Entry<String>> entries = map.long2ObjectEntrySet(); + Assertions.assertEquals(2, entries.size()); + + Set<Long> keys = new HashSet<>(); + for (Long2ObjectMap.Entry<String> entry : entries) { + keys.add(entry.getLongKey()); + } + Assertions.assertTrue(keys.contains(1L)); + Assertions.assertTrue(keys.contains(2L)); + } + + @Test + void testKeySet() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(10L, "ten"); + map.put(20L, "twenty"); + LongSet keys = map.keySet(); + Assertions.assertEquals(2, keys.size()); + Assertions.assertTrue(keys.contains(10L)); + Assertions.assertTrue(keys.contains(20L)); + } + + @Test + void testValues() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(1L, "one"); + map.put(2L, "two"); + ObjectCollection<String> values = map.values(); + Assertions.assertEquals(2, values.size()); + Assertions.assertTrue(values.contains("one")); + Assertions.assertTrue(values.contains("two")); + } + + @Test + void testStream() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + for (long i = 0; i < 100; i++) { + map.put(i, "val-" + i); + } + long count = map.values().stream().filter(v -> v.startsWith("val-")).count(); + Assertions.assertEquals(100, count); + } + + @Test + void testForEach() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(1L, "one"); + map.put(2L, "two"); + Map<Long, String> collected = new HashMap<>(); + map.forEach(collected::put); + Assertions.assertEquals(2, collected.size()); + Assertions.assertEquals("one", collected.get(1L)); + } + + @Test + void testNullValues() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(1L, null); + Assertions.assertTrue(map.containsKey(1L)); + Assertions.assertNull(map.get(1L)); + } + + @Test + void testLargeMap() { + ConcurrentLong2ObjectHashMap<Long> map = new ConcurrentLong2ObjectHashMap<>(); + int count = 100_000; + for (long i = 0; i < count; i++) { + map.put(i, i * 2); + } + Assertions.assertEquals(count, map.size()); + for (long i = 0; i < count; i++) { + Assertions.assertEquals(Long.valueOf(i * 2), map.get(i)); + } + } + + @Test + void testCustomSegmentCount() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(4); + for (long i = 0; i < 1000; i++) { + map.put(i, "v" + i); + } + Assertions.assertEquals(1000, map.size()); + } + + @Test + void testInvalidSegmentCount() { + Assertions.assertThrows(IllegalArgumentException.class, () -> new ConcurrentLong2ObjectHashMap<>(3)); + Assertions.assertThrows(IllegalArgumentException.class, () -> new ConcurrentLong2ObjectHashMap<>(0)); + Assertions.assertThrows(IllegalArgumentException.class, () -> new ConcurrentLong2ObjectHashMap<>(-1)); + } + + // ---- Concurrency tests ---- + + @Test + void testConcurrentPuts() throws Exception { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + int threads = 8; + int keysPerThread = 10_000; + ExecutorService executor = Executors.newFixedThreadPool(threads); + CountDownLatch latch = new CountDownLatch(threads); + + for (int t = 0; t < threads; t++) { + final int threadId = t; + executor.submit(() -> { + for (int i = 0; i < keysPerThread; i++) { + long key = (long) threadId * keysPerThread + i; + map.put(key, "t" + threadId + "-" + i); + } + latch.countDown(); + }); + } + latch.await(); + executor.shutdown(); + + Assertions.assertEquals(threads * keysPerThread, map.size()); + } + + @Test + void testConcurrentReadWrite() throws Exception { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + // Pre-populate + for (long i = 0; i < 1000; i++) { + map.put(i, "v" + i); + } + + int threads = 8; + ExecutorService executor = Executors.newFixedThreadPool(threads); + CountDownLatch latch = new CountDownLatch(threads); + AtomicInteger errors = new AtomicInteger(); + + for (int t = 0; t < threads; t++) { + final int threadId = t; + executor.submit(() -> { + try { + for (int i = 0; i < 5000; i++) { + long key = i % 1000; + if (threadId % 2 == 0) { + // Reader + map.get(key); + map.containsKey(key); + } else { + // Writer + map.put(key + 1000L * threadId, "new-" + i); + } + } + } catch (Exception e) { + errors.incrementAndGet(); + } finally { + latch.countDown(); + } + }); + } + latch.await(); + executor.shutdown(); + Assertions.assertEquals(0, errors.get()); + } + + @Test + void testConcurrentComputeIfAbsent() throws Exception { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + int threads = 16; + long sharedKey = 42L; + AtomicInteger computeCount = new AtomicInteger(); + ExecutorService executor = Executors.newFixedThreadPool(threads); + List<Future<String>> futures = new ArrayList<>(); + + for (int t = 0; t < threads; t++) { + futures.add(executor.submit(() -> + map.computeIfAbsent(sharedKey, k -> { + computeCount.incrementAndGet(); + return "computed"; + }) + )); + } + Set<String> results = new HashSet<>(); + for (Future<String> f : futures) { + results.add(f.get()); + } + executor.shutdown(); + + // All threads should get the same value + Assertions.assertEquals(1, results.size()); + Assertions.assertTrue(results.contains("computed")); + // The function should have been called exactly once + Assertions.assertEquals(1, computeCount.get()); + } + + @Test + void testConcurrentIterationDuringModification() throws Exception { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + for (long i = 0; i < 1000; i++) { + map.put(i, "v" + i); + } + + int threads = 4; + ExecutorService executor = Executors.newFixedThreadPool(threads); + CountDownLatch latch = new CountDownLatch(threads); + AtomicInteger errors = new AtomicInteger(); + + for (int t = 0; t < threads; t++) { + final int threadId = t; + executor.submit(() -> { + try { + for (int i = 0; i < 100; i++) { + if (threadId % 2 == 0) { + // Iterator - should not throw + map.keySet(); + map.values(); + map.long2ObjectEntrySet(); + } else { + // Modifier + map.put(1000L + threadId * 100 + i, "new"); + map.remove((long) (i % 500)); + } + } + } catch (Exception e) { + errors.incrementAndGet(); + } finally { + latch.countDown(); + } + }); + } + latch.await(); + executor.shutdown(); + Assertions.assertEquals(0, errors.get()); + } + + // ---- Gson serialization tests ---- + + @Test + void testGsonRoundTrip() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(100L, "hundred"); + map.put(200L, "two-hundred"); + + String json = GsonUtils.GSON.toJson(map); + + Type type = new TypeToken<ConcurrentLong2ObjectHashMap<String>>() {}.getType(); + ConcurrentLong2ObjectHashMap<String> deserialized = GsonUtils.GSON.fromJson(json, type); + + Assertions.assertEquals(2, deserialized.size()); + Assertions.assertEquals("hundred", deserialized.get(100L)); + Assertions.assertEquals("two-hundred", deserialized.get(200L)); + } + + @Test + void testGsonFormatCompatibleWithConcurrentHashMap() { + // Verify the JSON format matches what ConcurrentHashMap<Long, String> produces + ConcurrentHashMap<Long, String> chm = new ConcurrentHashMap<>(); + chm.put(1L, "one"); + chm.put(2L, "two"); + String chmJson = new Gson().toJson(chm); + + ConcurrentLong2ObjectHashMap<String> fastMap = new ConcurrentLong2ObjectHashMap<>(); + fastMap.put(1L, "one"); + fastMap.put(2L, "two"); + String fastJson = GsonUtils.GSON.toJson(fastMap); + + // Both should be parseable as the same JSON object + Gson gson = new Gson(); + Map<?, ?> chmParsed = gson.fromJson(chmJson, Map.class); + Map<?, ?> fastParsed = gson.fromJson(fastJson, Map.class); + Assertions.assertEquals(chmParsed, fastParsed); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
