This is an automated email from the ASF dual-hosted git repository.

yx-keith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 962b88fe624 [fix](fe) avoid concurrent tablet stat iteration failures 
(#63298)
962b88fe624 is described below

commit 962b88fe62400d67ba4fd91c45f1c288c74c9856
Author: yaoxiao <[email protected]>
AuthorDate: Sat May 23 16:01:35 2026 +0800

    [fix](fe) avoid concurrent tablet stat iteration failures (#63298)
    
    ### What problem does this PR solve?
    
    Issue Number: #59138
    
    Problem Summary:
    
    TabletStatMgr.runAfterCatalogReady() is a periodic master-FE daemon that
    iterates every tablet/replica to pull statistics. When DDL runs
    concurrently with this daemon, two races fire:
    
    Iteration race (CME). MaterializedIndex.tablets and LocalTablet.replicas
    were plain ArrayLists whose getters returned the internal list. A
    concurrent addTablet / addReplica / deleteReplica (clone, repair, schema
    change, restore, report handler) during iteration triggered the
    fail-fast iterator and threw ConcurrentModificationException.
    TOCTOU race. In updateTabletStat, a getTabletMeta(id) != null check is
    followed by getReplica(id, beId). If the tablet is removed in between,
    getReplica hits Preconditions.checkState(...) and throws
    IllegalStateException.
    When the daemon throws, the current cycle leaves stale
    tablet/partition/table sizes and skewed MetricRepo metrics until the
    next cycle.
    
    Solution:
    Close the CME race for good with copy-on-write via a volatile snapshot.
    A first attempt returned a defensive copy (Lists.newArrayList(...)), but
    the copy itself iterates the source list and can still CME mid-copy —
    the window shrank but did not close. This PR instead:
    
    Makes LocalTablet.replicas and MaterializedIndex.tablets volatile.
    Writers (addReplica / deleteReplica / deleteReplicaByBackendId /
    addTablet / clearTabletsForRestore) are synchronized, build a new list,
    and atomically swap the volatile reference — they never mutate a list in
    place.
    Readers (getReplicas() / getTablets()) do a single volatile read and
    return an immutable snapshot (Collections.unmodifiableList). Iteration
    is lock-free and can never CME, and the hot read path no longer copies
    elements.
    synchronized on writers is required (not just volatile) because some
    write paths do not hold the OlapTable write lock — verified by tracing
    call sites: InternalCatalog.createPartitionWithIndices and
    RestoreJob.resetPartitionForRestore call addReplica/addTablet without
    the table write lock, so concurrent writers are real and a plain
    volatile field would allow lost updates. Writers are infrequent (DDL /
    repair / restore), so the lock cost is negligible; reads stay lock-free.
    
    TOCTOU race is handled by catching IllegalStateException around
    getReplica (kept from the original fix) and counting the skip via a new
    TabletStatMgr.staleTabletStatSkipped counter, which makes the race
    observable (>0 proves the window was actually hit) instead of relying
    solely on log scraping.
    
    Cloud path: CloudTabletStatMgr.updateStatInfo iterates
    tablet.getReplicas() and is covered by the same snapshot fix; its
    updateTabletStat uses getReplicasByTabletId (locked, returns empty list,
    no checkState) and is already safe.
    
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
    
    ---------
    
    Co-authored-by: morningman <[email protected]>
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../doris/alter/MaterializedViewHandler.java       | 17 +++-
 .../apache/doris/alter/SchemaChangeHandler.java    | 19 ++++-
 .../java/org/apache/doris/backup/RestoreJob.java   |  9 +-
 .../apache/doris/catalog/CloudTabletStatMgr.java   |  1 +
 .../java/org/apache/doris/catalog/LocalTablet.java | 96 +++++++++++++---------
 .../apache/doris/catalog/MaterializedIndex.java    | 89 ++++++++++++++++----
 .../java/org/apache/doris/catalog/OlapTable.java   |  7 +-
 .../org/apache/doris/catalog/TabletStatMgr.java    | 18 +++-
 .../apache/doris/cloud/backup/CloudRestoreJob.java |  8 +-
 .../cloud/datasource/CloudInternalCatalog.java     | 12 ++-
 .../apache/doris/datasource/InternalCatalog.java   | 14 +++-
 .../doris/catalog/MaterializedIndexTest.java       | 65 +++++++++++++++
 .../java/org/apache/doris/catalog/TabletTest.java  | 79 ++++++++++++++++++
 13 files changed, 360 insertions(+), 74 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index f875c807847..700fad2f552 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -443,14 +443,22 @@ public class MaterializedViewHandler extends AlterHandler 
{
             MaterializedIndex mvIndex = new MaterializedIndex(mvIndexId, 
IndexState.SHADOW);
             MaterializedIndex baseIndex = partition.getIndex(baseIndexId);
             short replicationNum = 
olapTable.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum();
+            // All MV tablets of the same (partition, mv index) share the same 
TabletMeta;
+            // build it once and bulk-publish to MaterializedIndex.tablets 
after the per-tablet
+            // loop to keep copy-on-write O(n). TabletInvertedIndex 
registration stays
+            // per-iteration because Tablet.addReplica(...) below needs the 
tablet present
+            // in the inverted index.
+            TabletMeta mvTabletMeta = new TabletMeta(
+                    dbId, tableId, partitionId, mvIndexId, mvSchemaHash, 
medium);
+            List<Tablet> mvTabletsForPartition = 
Lists.newArrayListWithCapacity(baseIndex.getTablets().size());
+            TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
             for (Tablet baseTablet : baseIndex.getTablets()) {
-                TabletMeta mvTabletMeta = new TabletMeta(
-                        dbId, tableId, partitionId, mvIndexId, mvSchemaHash, 
medium);
                 long baseTabletId = baseTablet.getId();
                 long mvTabletId = idGeneratorBuffer.getNextId();
 
                 Tablet newTablet = 
EnvFactory.getInstance().createTablet(mvTabletId);
-                mvIndex.addTablet(newTablet, mvTabletMeta);
+                invertedIndex.addTablet(mvTabletId, mvTabletMeta);
+                mvTabletsForPartition.add(newTablet);
                 addedTablets.add(newTablet);
 
                 mvJob.addTabletIdMap(partitionId, mvTabletId, baseTabletId);
@@ -505,6 +513,9 @@ public class MaterializedViewHandler extends AlterHandler {
                 }
             } // end for baseTablets
 
+            // Bulk-publish all MV tablets for this partition in one 
copy-on-write.
+            mvIndex.appendTablets(mvTabletsForPartition);
+
             mvJob.addMVIndex(partitionId, mvIndex);
 
             if (LOG.isDebugEnabled()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 659908873ef..4f56b4aa94a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -54,6 +54,7 @@ import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf.TableType;
 import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.catalog.info.ColumnPosition;
 import org.apache.doris.catalog.info.IndexType;
@@ -2050,14 +2051,23 @@ public class SchemaChangeHandler extends AlterHandler {
                 MaterializedIndex originIndex = 
partition.getIndex(originIndexId);
                 ReplicaAllocation replicaAlloc = 
olapTable.getPartitionInfo().getReplicaAllocation(partitionId);
                 Short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
+                // All shadow tablets of the same (partition, shadow index) 
share the same TabletMeta;
+                // build it once and bulk-publish to MaterializedIndex.tablets 
after the per-tablet
+                // loop to keep copy-on-write O(n). TabletInvertedIndex 
registration stays
+                // per-iteration because Tablet.addReplica(...) below needs 
the tablet present
+                // in the inverted index.
+                TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, 
partitionId, shadowIndexId,
+                        newSchemaHash, medium);
+                List<Tablet> shadowTabletsForPartition = 
Lists.newArrayListWithCapacity(
+                        originIndex.getTablets().size());
+                TabletInvertedIndex invertedIndex = 
Env.getCurrentInvertedIndex();
                 for (Tablet originTablet : originIndex.getTablets()) {
-                    TabletMeta shadowTabletMeta = new TabletMeta(dbId, 
tableId, partitionId, shadowIndexId,
-                            newSchemaHash, medium);
                     long originTabletId = originTablet.getId();
                     long shadowTabletId = idGeneratorBuffer.getNextId();
 
                     Tablet shadowTablet = 
EnvFactory.getInstance().createTablet(shadowTabletId);
-                    shadowIndex.addTablet(shadowTablet, shadowTabletMeta);
+                    invertedIndex.addTablet(shadowTabletId, shadowTabletMeta);
+                    shadowTabletsForPartition.add(shadowTablet);
                     addedTablets.add(shadowTablet);
 
                     schemaChangeJob.addTabletIdMap(partitionId, shadowIndexId, 
shadowTabletId, originTabletId);
@@ -2115,6 +2125,9 @@ public class SchemaChangeHandler extends AlterHandler {
                     }
                 }
 
+                // Bulk-publish all shadow tablets for this partition in one 
copy-on-write.
+                shadowIndex.appendTablets(shadowTabletsForPartition);
+
                 schemaChangeJob.addPartitionShadowIndex(partitionId, 
shadowIndexId, shadowIndex);
             } // end for partition
             schemaChangeJob.addIndexSchema(shadowIndexId, originIndexId, 
newIndexName, newSchemaVersion, newSchemaHash,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 528cbff5018..0aaa7140aaa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -109,6 +109,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.io.DataInput;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -1553,12 +1554,13 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
             int schemaHash = 
remoteTbl.getSchemaHashByIndexId(remoteIdx.getId());
             int remotetabletSize = remoteIdx.getTablets().size();
             remoteIdx.clearTabletsForRestore();
+            // Collect locally and bulk-publish to keep copy-on-write O(n) for 
the whole index.
+            List<Tablet> newTablets = new ArrayList<>(remotetabletSize);
             for (int i = 0; i < remotetabletSize; i++) {
                 // generate new tablet id
                 long newTabletId = env.getNextId();
                 Tablet newTablet = 
EnvFactory.getInstance().createTablet(newTabletId);
-                // add tablet to index, but not add to TabletInvertedIndex
-                remoteIdx.addTablet(newTablet, null /* tablet meta */, true /* 
is restore */);
+                newTablets.add(newTablet);
                 // replicas
                 try {
                     Pair<Map<Tag, List<Long>>, TStorageMedium> beIdsAndMedium 
= Env.getCurrentSystemInfo()
@@ -1577,6 +1579,9 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
                     return null;
                 }
             }
+            // add tablets to index in one batch; TabletInvertedIndex 
registration
+            // is intentionally skipped on the restore path (rebuilt 
separately).
+            remoteIdx.appendTablets(newTablets);
         }
         return remotePart;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
index 6765a001492..118aa401227 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
@@ -366,6 +366,7 @@ public class CloudTabletStatMgr extends MasterDaemon {
                                 long tabletIndexSize = 0L;
                                 long tabletSegmentSize = 0L;
 
+                                // getReplicas() returns an immutable volatile 
snapshot; CME-safe under concurrent DDL.
                                 for (Replica replica : tablet.getReplicas()) {
                                     if (replica.getDataSize() > 
tabletDataSize) {
                                         tabletDataSize = replica.getDataSize();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java
index 14d9171f3ff..5db0a1286b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java
@@ -31,8 +31,8 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
-import java.util.Iterator;
 import java.util.List;
 import java.util.stream.LongStream;
 
@@ -40,7 +40,7 @@ public class LocalTablet extends Tablet {
     private static final Logger LOG = LogManager.getLogger(LocalTablet.class);
 
     @SerializedName(value = "rs", alternate = {"replicas"})
-    private List<Replica> replicas;
+    private volatile List<Replica> replicas;
     @SerializedName(value = "lastCheckTime")
     private long lastCheckTime;
 
@@ -109,13 +109,14 @@ public class LocalTablet extends Tablet {
         if (cooldownReplicaId <= 0) {
             return 0;
         }
-        for (Replica r : replicas) {
+        List<Replica> snapshot = replicas; // single volatile read; reuse below
+        for (Replica r : snapshot) {
             if (r.getId() == cooldownReplicaId) {
                 return r.getRemoteDataSize();
             }
         }
         // return replica with max remoteDataSize
-        return 
replicas.stream().max(Comparator.comparing(Replica::getRemoteDataSize)).get().getRemoteDataSize();
+        return 
snapshot.stream().max(Comparator.comparing(Replica::getRemoteDataSize)).get().getRemoteDataSize();
     }
 
     @Override
@@ -223,29 +224,32 @@ public class LocalTablet extends Tablet {
         this.lastCheckTime = lastCheckTime;
     }
 
-    private boolean isLatestReplicaAndDeleteOld(Replica newReplica) {
-        boolean delete = false;
+    // Writers are synchronized on this tablet to prevent concurrent 
lost-update:
+    // some callers (e.g. InternalCatalog.createTablets, RestoreJob) do NOT 
hold
+    // the OlapTable write lock when modifying replicas.
+    // Readers capture the volatile reference once and iterate freely — no 
lock needed.
+
+    @Override
+    public synchronized void addReplica(Replica replica, boolean isRestore) {
+        long version = replica.getVersion();
+        long backendId = replica.getBackendIdWithoutException();
         boolean hasBackend = false;
-        long version = newReplica.getVersion();
-        Iterator<Replica> iterator = replicas.iterator();
-        while (iterator.hasNext()) {
-            Replica replica = iterator.next();
-            if (replica.getBackendIdWithoutException() == 
newReplica.getBackendIdWithoutException()) {
+        boolean deletedOld = false;
+        List<Replica> current = replicas;
+        List<Replica> next = new ArrayList<>(current.size() + 1);
+        for (Replica r : current) {
+            if (r.getBackendIdWithoutException() == backendId) {
                 hasBackend = true;
-                if (replica.getVersion() <= version) {
-                    iterator.remove();
-                    delete = true;
+                if (r.getVersion() <= version) {
+                    deletedOld = true;
+                    continue; // drop stale replica
                 }
             }
+            next.add(r);
         }
-
-        return delete || !hasBackend;
-    }
-
-    @Override
-    public void addReplica(Replica replica, boolean isRestore) {
-        if (isLatestReplicaAndDeleteOld(replica)) {
-            replicas.add(replica);
+        if (deletedOld || !hasBackend) {
+            next.add(replica);
+            replicas = next; // volatile write; readers see the new immutable 
snapshot
             if (!isRestore) {
                 Env.getCurrentInvertedIndex().addReplica(id, replica);
             }
@@ -254,12 +258,13 @@ public class LocalTablet extends Tablet {
 
     @Override
     public List<Replica> getReplicas() {
-        return this.replicas;
+        // Volatile read: returns the current immutable snapshot; callers 
iterate without locking.
+        return Collections.unmodifiableList(replicas);
     }
 
     @Override
     public Replica getReplicaByBackendId(long backendId) {
-        for (Replica replica : replicas) {
+        for (Replica replica : replicas) { // single volatile read
             if (replica.getBackendIdWithoutException() == backendId) {
                 return replica;
             }
@@ -268,9 +273,12 @@ public class LocalTablet extends Tablet {
     }
 
     @Override
-    public boolean deleteReplica(Replica replica) {
-        if (replicas.contains(replica)) {
-            replicas.remove(replica);
+    public synchronized boolean deleteReplica(Replica replica) {
+        List<Replica> current = replicas;
+        if (current.contains(replica)) {
+            List<Replica> next = new ArrayList<>(current);
+            next.remove(replica);
+            replicas = next; // volatile write
             Env.getCurrentInvertedIndex().deleteReplica(id, 
replica.getBackendIdWithoutException());
             return true;
         }
@@ -278,16 +286,22 @@ public class LocalTablet extends Tablet {
     }
 
     @Override
-    public boolean deleteReplicaByBackendId(long backendId) {
-        Iterator<Replica> iterator = replicas.iterator();
-        while (iterator.hasNext()) {
-            Replica replica = iterator.next();
+    public synchronized boolean deleteReplicaByBackendId(long backendId) {
+        List<Replica> current = replicas;
+        List<Replica> next = new ArrayList<>(current.size());
+        Replica found = null;
+        for (Replica replica : current) {
             if (replica.getBackendIdWithoutException() == backendId) {
-                iterator.remove();
-                Env.getCurrentInvertedIndex().deleteReplica(id, backendId);
-                return true;
+                found = replica;
+            } else {
+                next.add(replica);
             }
         }
+        if (found != null) {
+            replicas = next; // volatile write
+            Env.getCurrentInvertedIndex().deleteReplica(id, backendId);
+            return true;
+        }
         return false;
     }
 
@@ -302,13 +316,17 @@ public class LocalTablet extends Tablet {
 
         LocalTablet tablet = (LocalTablet) obj;
 
-        if (replicas != tablet.replicas) {
-            if (replicas.size() != tablet.replicas.size()) {
+        // Capture one snapshot per side so a concurrent writer cannot publish
+        // a different list between size/contains/get calls below.
+        List<Replica> thisReplicas = replicas;
+        List<Replica> otherReplicas = tablet.replicas;
+        if (thisReplicas != otherReplicas) {
+            if (thisReplicas.size() != otherReplicas.size()) {
                 return false;
             }
-            int size = replicas.size();
+            int size = thisReplicas.size();
             for (int i = 0; i < size; i++) {
-                if (!tablet.replicas.contains(replicas.get(i))) {
+                if (!otherReplicas.contains(thisReplicas.get(i))) {
                     return false;
                 }
             }
@@ -334,7 +352,7 @@ public class LocalTablet extends Tablet {
         }
 
         boolean allBeAliveOrDecommissioned = true;
-        for (Replica replica : replicas) {
+        for (Replica replica : replicas) { // single volatile read; iteration 
on the snapshot
             Backend backend = 
infoService.getBackend(replica.getBackendIdWithoutException());
             if (backend == null || (!backend.isAlive() && 
!backend.isDecommissioned())) {
                 allBeAliveOrDecommissioned = false;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java
index 2e98354acef..f84ff018ef6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java
@@ -19,10 +19,11 @@ package org.apache.doris.catalog;
 
 import org.apache.doris.persist.gson.GsonPostProcessable;
 
-import com.google.common.collect.Lists;
 import com.google.gson.annotations.SerializedName;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -57,10 +58,16 @@ public class MaterializedIndex extends MetaObject 
implements GsonPostProcessable
     @SerializedName(value = "rowCount")
     private long rowCount;
 
-    private Map<Long, Tablet> idToTablets;
+    // Published as a volatile immutable snapshot in lockstep with `tablets`.
+    // Writers (synchronized) build a fresh HashMap and assign the field; 
readers
+    // capture the reference once and call get/containsKey on the snapshot.
+    // Invariant: `tablets ⊆ idToTablets` — any tablet visible in the list is 
also
+    // present in the map. This is preserved by publishing the map BEFORE the 
list
+    // on add and the list BEFORE the map on clear.
+    private volatile Map<Long, Tablet> idToTablets;
     @SerializedName(value = "tablets")
     // this is for keeping tablet order
-    private List<Tablet> tablets;
+    private volatile List<Tablet> tablets;
 
     // for push after rollup index finished
     @SerializedName(value = "rollupIndexId")
@@ -94,38 +101,78 @@ public class MaterializedIndex extends MetaObject 
implements GsonPostProcessable
     }
 
     public List<Tablet> getTablets() {
-        return tablets;
+        // Volatile read: returns the current immutable snapshot; callers 
iterate without locking.
+        return Collections.unmodifiableList(tablets);
     }
 
     public List<Long> getTabletIdsInOrder() {
-        List<Long> tabletIds = Lists.newArrayListWithCapacity(tablets.size());
-        for (Tablet tablet : tablets) {
+        List<Tablet> snapshot = tablets; // single volatile read
+        List<Long> tabletIds = new ArrayList<>(snapshot.size());
+        for (Tablet tablet : snapshot) {
             tabletIds.add(tablet.getId());
         }
         return tabletIds;
     }
 
     public Tablet getTablet(long tabletId) {
+        // Single volatile read of the immutable map snapshot.
         return idToTablets.get(tabletId);
     }
 
-    public void clearTabletsForRestore() {
-        idToTablets.clear();
-        tablets.clear();
+    public synchronized void clearTabletsForRestore() {
+        // Drop the list first so iteration stops seeing tablets before
+        // lookup-by-id drops them. Maintains tablets ⊆ idToTablets.
+        tablets = new ArrayList<>();
+        idToTablets = new HashMap<>();
     }
 
-    public void addTablet(Tablet tablet, TabletMeta tabletMeta) {
+    public synchronized void addTablet(Tablet tablet, TabletMeta tabletMeta) {
         addTablet(tablet, tabletMeta, false);
     }
 
-    public void addTablet(Tablet tablet, TabletMeta tabletMeta, boolean 
isRestore) {
-        idToTablets.put(tablet.getId(), tablet);
-        tablets.add(tablet);
+    // Writers are synchronized on this index to prevent concurrent 
lost-update:
+    // some callers (e.g. InternalCatalog.createTablets) do NOT hold the 
OlapTable
+    // write lock when adding tablets.
+    // Copy-on-write keeps readers CME-safe without locking; for bulk creation 
use
+    // appendTablets(...) so the per-index tablets list is copied once per 
batch
+    // instead of once per tablet.
+    public synchronized void addTablet(Tablet tablet, TabletMeta tabletMeta, 
boolean isRestore) {
+        appendTabletsInternal(Collections.singletonList(tablet));
         if (!isRestore) {
             Env.getCurrentInvertedIndex().addTablet(tablet.getId(), 
tabletMeta);
         }
     }
 
+    // Bulk-publish: append the given tablets to this index's tablets list in a
+    // single copy-on-write (O(existing + batch) instead of O(n^2) over n
+    // single-tablet adds inside a synchronized block).
+    //
+    // Does NOT touch TabletInvertedIndex. Bulk-creation callers register 
tablets
+    // in TabletInvertedIndex eagerly inside their per-tablet loop because
+    // Tablet.addReplica(...) (non-restore) requires the tablet to already be
+    // present in the inverted index; only the per-index list copy is expensive
+    // enough to be worth batching.
+    public synchronized void appendTablets(Collection<Tablet> newTablets) {
+        appendTabletsInternal(newTablets);
+    }
+
+    private void appendTabletsInternal(Collection<Tablet> newTablets) {
+        if (newTablets.isEmpty()) {
+            return;
+        }
+        Map<Long, Tablet> nextMap = new HashMap<>(idToTablets);
+        List<Tablet> nextList = new ArrayList<>(tablets.size() + 
newTablets.size());
+        nextList.addAll(tablets);
+        for (Tablet tablet : newTablets) {
+            nextMap.put(tablet.getId(), tablet);
+            nextList.add(tablet);
+        }
+        // Publish the map first, then the list — so any id that appears in the
+        // visible `tablets` snapshot is already present in `idToTablets`.
+        idToTablets = nextMap;
+        tablets = nextList;
+    }
+
     public void setIdForRestore(long idxId) {
         this.id = idxId;
     }
@@ -241,8 +288,9 @@ public class MaterializedIndex extends MetaObject 
implements GsonPostProcessable
     }
 
     public int getTabletOrderIdx(long tabletId) {
+        List<Tablet> snapshot = tablets; // single volatile read
         int idx = 0;
-        for (Tablet tablet : tablets) {
+        for (Tablet tablet : snapshot) {
             if (tablet.getId() == tabletId) {
                 return idx;
             }
@@ -279,15 +327,16 @@ public class MaterializedIndex extends MetaObject 
implements GsonPostProcessable
 
     @Override
     public String toString() {
+        List<Tablet> snapshot = tablets; // single volatile read
         StringBuilder buffer = new StringBuilder();
         buffer.append("index id: ").append(id).append("; ");
         buffer.append("index state: ").append(state.name()).append("; ");
 
         buffer.append("row count: ").append(rowCount).append("; ");
-        buffer.append("tablets size: ").append(tablets.size()).append("; ");
+        buffer.append("tablets size: ").append(snapshot.size()).append("; ");
         //
         buffer.append("tablets: [");
-        for (Tablet tablet : tablets) {
+        for (Tablet tablet : snapshot) {
             buffer.append("tablet: ").append(tablet.toString()).append(", ");
         }
         buffer.append("]; ");
@@ -300,9 +349,13 @@ public class MaterializedIndex extends MetaObject 
implements GsonPostProcessable
 
     @Override
     public void gsonPostProcess() {
-        // build "idToTablets" from "tablets"
+        // Build a fresh "idToTablets" snapshot from the deserialized 
"tablets" list.
+        // Runs single-threaded during gson deserialization, before any 
concurrent
+        // reader can observe this object.
+        Map<Long, Tablet> map = new HashMap<>(tablets.size());
         for (Tablet tablet : tablets) {
-            idToTablets.put(tablet.getId(), tablet);
+            map.put(tablet.getId(), tablet);
         }
+        idToTablets = map;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 2482a0514e4..38cd24c9e2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -972,10 +972,12 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
                 // generate new tablets in origin tablet order
                 int tabletNum = idx.getTablets().size();
                 idx.clearTabletsForRestore();
+                // Collect locally and bulk-publish to keep copy-on-write O(n) 
for the whole index.
+                List<Tablet> newTablets = new ArrayList<>(tabletNum);
                 for (int i = 0; i < tabletNum; i++) {
                     long newTabletId = env.getNextId();
                     Tablet newTablet = 
EnvFactory.getInstance().createTablet(newTabletId);
-                    idx.addTablet(newTablet, null /* tablet meta */, true /* 
is restore */);
+                    newTablets.add(newTablet);
                     // replicas
                     if (Config.isCloudMode()) {
                         long newReplicaId = Env.getCurrentEnv().getNextId();
@@ -1015,6 +1017,9 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
                         return new Status(ErrCode.COMMON_ERROR, 
e.getMessage());
                     }
                 }
+                // add tablets to index in one batch; TabletInvertedIndex 
registration
+                // is intentionally skipped on the restore path (rebuilt 
separately).
+                idx.appendTablets(newTablets);
             }
 
             if (createNewColocateGroup) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
index bcf74528da6..ec99c8d45a9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
@@ -382,7 +382,14 @@ public class TabletStatMgr extends MasterDaemon {
         if (result.isSetTabletStatList()) {
             for (TTabletStat stat : result.getTabletStatList()) {
                 if (invertedIndex.getTabletMeta(stat.getTabletId()) != null) {
-                    Replica replica = 
invertedIndex.getReplica(stat.getTabletId(), beId);
+                    Replica replica;
+                    try {
+                        replica = invertedIndex.getReplica(stat.getTabletId(), 
beId);
+                    } catch (IllegalStateException e) {
+                        LOG.debug("skip stale tablet stat update for tablet {} 
on backend {}: {}",
+                                stat.getTabletId(), beId, e.getMessage());
+                        continue;
+                    }
                     if (replica != null) {
                         replica.setDataSize(stat.getDataSize());
                         replica.setRemoteDataSize(stat.getRemoteDataSize());
@@ -411,7 +418,14 @@ public class TabletStatMgr extends MasterDaemon {
                     // the replica is obsolete, ignore it.
                     continue;
                 }
-                Replica replica = invertedIndex.getReplica(entry.getKey(), 
beId);
+                Replica replica;
+                try {
+                    replica = invertedIndex.getReplica(entry.getKey(), beId);
+                } catch (IllegalStateException e) {
+                    LOG.debug("skip stale tablet stat update for tablet {} on 
backend {}: {}",
+                            entry.getKey(), beId, e.getMessage());
+                    continue;
+                }
                 if (replica == null) {
                     // replica may be deleted from catalog, ignore it.
                     continue;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java
index f47695adecf..a9e6469a6d9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java
@@ -333,18 +333,22 @@ public class CloudRestoreJob extends RestoreJob {
             int schemaHash = 
remoteTbl.getSchemaHashByIndexId(remoteIdx.getId());
             int remotetabletSize = remoteIdx.getTablets().size();
             remoteIdx.clearTabletsForRestore();
+            // Collect locally and bulk-publish to keep copy-on-write O(n) for 
the whole index.
+            List<Tablet> newTablets = new ArrayList<>(remotetabletSize);
             for (int i = 0; i < remotetabletSize; i++) {
                 // generate new tablet id
                 long newTabletId = env.getNextId();
                 Tablet newTablet = 
EnvFactory.getInstance().createTablet(newTabletId);
-                // add tablet to index, but not add to TabletInvertedIndex
-                remoteIdx.addTablet(newTablet, null /* tablet meta */, true /* 
is restore */);
+                newTablets.add(newTablet);
                 // replicas
                 long newReplicaId = Env.getCurrentEnv().getNextId();
                 Replica replica = new CloudReplica(newReplicaId, null, 
Replica.ReplicaState.NORMAL,
                         visibleVersion, schemaHash, dbId, localTbl.getId(), 
partitionId, remoteIdx.getId(), i);
                 newTablet.addReplica(replica, true /* is restore */);
             }
+            // add tablets to index in one batch; TabletInvertedIndex 
registration
+            // is intentionally skipped on the restore path (rebuilt 
separately).
+            remoteIdx.appendTablets(newTablets);
         }
         return remotePart;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index 5bf096bc60e..cabc936772d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -41,6 +41,7 @@ import org.apache.doris.catalog.Replica.ReplicaState;
 import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.cloud.catalog.CloudEnv;
 import org.apache.doris.cloud.catalog.CloudPartition;
@@ -82,6 +83,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -448,11 +450,16 @@ public class CloudInternalCatalog extends InternalCatalog 
{
     private void createCloudTablets(MaterializedIndex index, ReplicaState 
replicaState,
             DistributionInfo distributionInfo, long version, ReplicaAllocation 
replicaAlloc,
             TabletMeta tabletMeta, Set<Long> tabletIdSet) throws DdlException {
+        // Collect bucket tablets locally and bulk-publish to the 
MaterializedIndex's
+        // tablets list in a single copy-on-write after the loop (see
+        // InternalCatalog.createTablets for rationale).
+        TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+        List<Tablet> bucketTablets = new 
ArrayList<>(distributionInfo.getBucketNum());
         for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
             Tablet tablet = 
EnvFactory.getInstance().createTablet(Env.getCurrentEnv().getNextId());
 
-            // add tablet to inverted index first
-            index.addTablet(tablet, tabletMeta);
+            invertedIndex.addTablet(tablet.getId(), tabletMeta);
+            bucketTablets.add(tablet);
             tabletIdSet.add(tablet.getId());
 
             long replicaId = Env.getCurrentEnv().getNextId();
@@ -461,6 +468,7 @@ public class CloudInternalCatalog extends InternalCatalog {
                     tabletMeta.getPartitionId(), tabletMeta.getIndexId(), i);
             tablet.addReplica(replica);
         }
+        index.appendTablets(bucketTablets);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index a445fd31fd4..15f78c23ff4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -3419,12 +3419,19 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             }
         }
 
+        // Collect bucket tablets locally and bulk-publish to the 
MaterializedIndex's
+        // tablets list in a single copy-on-write after the loop (O(bucketNum) 
instead
+        // of O(bucketNum^2)). TabletInvertedIndex registration stays 
per-iteration
+        // because Tablet.addReplica(...) below needs the tablet present in the
+        // inverted index.
+        TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+        List<Tablet> bucketTablets = new 
ArrayList<>(distributionInfo.getBucketNum());
         for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
             // create a new tablet with random chosen backends
             Tablet tablet = 
EnvFactory.getInstance().createTablet(idGeneratorBuffer.getNextId());
 
-            // add tablet to inverted index first
-            index.addTablet(tablet, tabletMeta);
+            invertedIndex.addTablet(tablet.getId(), tabletMeta);
+            bucketTablets.add(tablet);
             tabletIdSet.add(tablet.getId());
 
             // get BackendIds
@@ -3464,6 +3471,9 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                     totalReplicaNum + " vs. " + 
replicaAlloc.getTotalReplicaNum());
         }
 
+        // Publish all bucket tablets to the materialized index in one batch.
+        index.appendTablets(bucketTablets);
+
         if (groupId != null && chooseBackendsArbitrary) {
             colocateIndex.addBackendsPerBucketSeq(groupId, 
backendsPerBucketSeq);
             ColocatePersistInfo info = 
ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, 
backendsPerBucketSeq);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java
index e177717d366..3ff3f2519a3 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.MaterializedIndex.IndexState;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TStorageMedium;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -35,6 +36,8 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class MaterializedIndexTest {
 
@@ -73,6 +76,68 @@ public class MaterializedIndexTest {
         Assert.assertEquals(indexId, index.getId());
     }
 
+    @Test
+    public void testGetTabletsReturnsImmutableSnapshot() {
+        TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1, 
TStorageMedium.HDD);
+        index.addTablet(new LocalTablet(1L), tabletMeta, true);
+
+        List<Tablet> snapshot = index.getTablets();
+        Assert.assertEquals(1, snapshot.size());
+
+        // A write after the snapshot was taken must not be visible in it 
(copy-on-write).
+        index.addTablet(new LocalTablet(2L), tabletMeta, true);
+        Assert.assertEquals(1, snapshot.size());
+        Assert.assertEquals(2, index.getTablets().size());
+
+        // The returned snapshot is read-only.
+        Assert.assertThrows(UnsupportedOperationException.class, () -> 
snapshot.add(new LocalTablet(3L)));
+    }
+
+    @Test
+    public void testConcurrentGetTabletsNeverThrows() throws 
InterruptedException {
+        // A reader repeatedly snapshots and iterates getTablets() while a 
writer keeps
+        // adding tablets. Copy-on-write guarantees the reader never observes 
a partially
+        // built list or throws ConcurrentModificationException.
+        TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1, 
TStorageMedium.HDD);
+        AtomicReference<Throwable> error = new AtomicReference<>();
+        AtomicBoolean stop = new AtomicBoolean(false);
+
+        Thread writer = new Thread(() -> {
+            long id = 1000L;
+            while (!stop.get()) {
+                index.addTablet(new LocalTablet(id++), tabletMeta, true);
+                // Keep the list bounded (and exercise the clear path) so the 
test stays fast.
+                if (index.getTablets().size() > 64) {
+                    index.clearTabletsForRestore();
+                }
+            }
+        });
+
+        Thread reader = new Thread(() -> {
+            try {
+                for (int i = 0; i < 50000 && error.get() == null; i++) {
+                    for (Tablet tablet : index.getTablets()) {
+                        tablet.getId();
+                    }
+                }
+            } catch (Throwable t) {
+                error.set(t);
+            } finally {
+                stop.set(true);
+            }
+        });
+
+        writer.start();
+        reader.start();
+        reader.join();
+        stop.set(true);
+        writer.join();
+
+        if (error.get() != null) {
+            Assert.fail("getTablets() iteration threw under concurrent 
mutation: " + error.get());
+        }
+    }
+
     @Test
     public void testSerialization() throws Exception {
         // 1. Write objects to file
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
index eaafb31000e..2f89dcdeea8 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
@@ -38,6 +38,9 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class TabletTest {
 
@@ -118,6 +121,82 @@ public class TabletTest {
         Assert.assertEquals(1, tablet.getReplicas().size());
     }
 
+    @Test
+    public void testGetReplicasReturnsImmutableSnapshot() {
+        List<Replica> snapshot = tablet.getReplicas();
+        Assert.assertEquals(3, snapshot.size());
+
+        // A write after the snapshot was taken must not be visible in it 
(copy-on-write).
+        Replica replica4 = new LocalReplica(4L, 4L, 100L, 0, 200000L, 0, 
3000L, ReplicaState.NORMAL, 0, 0);
+        tablet.addReplica(replica4);
+        Assert.assertEquals(3, snapshot.size());
+        Assert.assertEquals(4, tablet.getReplicas().size());
+
+        // The returned snapshot is read-only.
+        Assert.assertThrows(UnsupportedOperationException.class, () -> 
snapshot.add(replica4));
+    }
+
+    @Test
+    public void testIterateReplicasWhileMutatingDoesNotThrow() {
+        // Iterating the snapshot returned by getReplicas() must not throw
+        // ConcurrentModificationException even when the tablet is 
structurally modified
+        // during iteration.
+        int seen = 0;
+        for (Replica r : tablet.getReplicas()) {
+            Assert.assertNotNull(r);
+            tablet.addReplica(new LocalReplica(100L + seen, 100L + seen, 100L, 
0, 200000L, 0, 3000L,
+                    ReplicaState.NORMAL, 0, 0));
+            tablet.deleteReplicaByBackendId(2L);
+            seen++;
+        }
+        Assert.assertEquals(3, seen);
+    }
+
+    @Test
+    public void testConcurrentGetReplicasNeverThrows() throws 
InterruptedException {
+        // A reader repeatedly snapshots and iterates getReplicas() while a 
writer keeps
+        // mutating the replica list. Copy-on-write guarantees the reader 
never observes a
+        // partially built list or throws ConcurrentModificationException.
+        AtomicReference<Throwable> error = new AtomicReference<>();
+        AtomicBoolean stop = new AtomicBoolean(false);
+
+        Thread writer = new Thread(() -> {
+            long id = 1000L;
+            while (!stop.get()) {
+                // Reuse a small set of backend ids so the list stays bounded 
while still
+                // exercising the add/replace path.
+                long beId = id % 8;
+                tablet.addReplica(new LocalReplica(id, beId, 100L, 0, 200000L, 
0, 3000L,
+                        ReplicaState.NORMAL, 0, 0), true);
+                id++;
+            }
+        });
+
+        Thread reader = new Thread(() -> {
+            try {
+                for (int i = 0; i < 50000 && error.get() == null; i++) {
+                    for (Replica r : tablet.getReplicas()) {
+                        r.getId();
+                    }
+                }
+            } catch (Throwable t) {
+                error.set(t);
+            } finally {
+                stop.set(true);
+            }
+        });
+
+        writer.start();
+        reader.start();
+        reader.join();
+        stop.set(true);
+        writer.join();
+
+        if (error.get() != null) {
+            Assert.fail("getReplicas() iteration threw under concurrent 
mutation: " + error.get());
+        }
+    }
+
     @Test
     public void testSerialization() throws Exception {
         final Path path = Files.createTempFile("olapTabletTest", "tmp");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to