This is an automated email from the ASF dual-hosted git repository.
yx-keith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 962b88fe624 [fix](fe) avoid concurrent tablet stat iteration failures
(#63298)
962b88fe624 is described below
commit 962b88fe62400d67ba4fd91c45f1c288c74c9856
Author: yaoxiao <[email protected]>
AuthorDate: Sat May 23 16:01:35 2026 +0800
[fix](fe) avoid concurrent tablet stat iteration failures (#63298)
### What problem does this PR solve?
Issue Number: #59138
Problem Summary:
TabletStatMgr.runAfterCatalogReady() is a periodic master-FE daemon that
iterates every tablet/replica to pull statistics. When DDL runs
concurrently with this daemon, two races fire:
Iteration race (CME). MaterializedIndex.tablets and LocalTablet.replicas
were plain ArrayLists whose getters returned the internal list. A
concurrent addTablet / addReplica / deleteReplica (clone, repair, schema
change, restore, report handler) during iteration triggered the
fail-fast iterator and threw ConcurrentModificationException.
TOCTOU race. In updateTabletStat, a getTabletMeta(id) != null check is
followed by getReplica(id, beId). If the tablet is removed in between,
getReplica hits Preconditions.checkState(...) and throws
IllegalStateException.
When the daemon throws, the current cycle leaves stale
tablet/partition/table sizes and skewed MetricRepo metrics until the
next cycle.
Solution:
Close the CME race for good with copy-on-write via a volatile snapshot.
A first attempt returned a defensive copy (Lists.newArrayList(...)), but
the copy itself iterates the source list and can still CME mid-copy —
the window shrank but did not close. This PR instead:
Makes LocalTablet.replicas and MaterializedIndex.tablets volatile.
Writers (addReplica / deleteReplica / deleteReplicaByBackendId /
addTablet / clearTabletsForRestore) are synchronized, build a new list,
and atomically swap the volatile reference — they never mutate a list in
place.
Readers (getReplicas() / getTablets()) do a single volatile read and
return an immutable snapshot (Collections.unmodifiableList). Iteration
is lock-free and can never CME, and the hot read path no longer copies
elements.
synchronized on writers is required (not just volatile) because some
write paths do not hold the OlapTable write lock — verified by tracing
call sites: InternalCatalog.createPartitionWithIndices and
RestoreJob.resetPartitionForRestore call addReplica/addTablet without
the table write lock, so concurrent writers are real and a plain
volatile field would allow lost updates. Writers are infrequent (DDL /
repair / restore), so the lock cost is negligible; reads stay lock-free.
TOCTOU race is handled by catching IllegalStateException around
getReplica (kept from the original fix) and counting the skip via a new
TabletStatMgr.staleTabletStatSkipped counter, which makes the race
observable (>0 proves the window was actually hit) instead of relying
solely on log scraping.
Cloud path: CloudTabletStatMgr.updateStatInfo iterates
tablet.getReplicas() and is covered by the same snapshot fix; its
updateTabletStat uses getReplicasByTabletId (locked, returns empty list,
no checkState) and is already safe.
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---------
Co-authored-by: morningman <[email protected]>
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../doris/alter/MaterializedViewHandler.java | 17 +++-
.../apache/doris/alter/SchemaChangeHandler.java | 19 ++++-
.../java/org/apache/doris/backup/RestoreJob.java | 9 +-
.../apache/doris/catalog/CloudTabletStatMgr.java | 1 +
.../java/org/apache/doris/catalog/LocalTablet.java | 96 +++++++++++++---------
.../apache/doris/catalog/MaterializedIndex.java | 89 ++++++++++++++++----
.../java/org/apache/doris/catalog/OlapTable.java | 7 +-
.../org/apache/doris/catalog/TabletStatMgr.java | 18 +++-
.../apache/doris/cloud/backup/CloudRestoreJob.java | 8 +-
.../cloud/datasource/CloudInternalCatalog.java | 12 ++-
.../apache/doris/datasource/InternalCatalog.java | 14 +++-
.../doris/catalog/MaterializedIndexTest.java | 65 +++++++++++++++
.../java/org/apache/doris/catalog/TabletTest.java | 79 ++++++++++++++++++
13 files changed, 360 insertions(+), 74 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index f875c807847..700fad2f552 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -443,14 +443,22 @@ public class MaterializedViewHandler extends AlterHandler
{
MaterializedIndex mvIndex = new MaterializedIndex(mvIndexId,
IndexState.SHADOW);
MaterializedIndex baseIndex = partition.getIndex(baseIndexId);
short replicationNum =
olapTable.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum();
+ // All MV tablets of the same (partition, mv index) share the same
TabletMeta;
+ // build it once and bulk-publish to MaterializedIndex.tablets
after the per-tablet
+ // loop to keep copy-on-write O(n). TabletInvertedIndex
registration stays
+ // per-iteration because Tablet.addReplica(...) below needs the
tablet present
+ // in the inverted index.
+ TabletMeta mvTabletMeta = new TabletMeta(
+ dbId, tableId, partitionId, mvIndexId, mvSchemaHash,
medium);
+ List<Tablet> mvTabletsForPartition =
Lists.newArrayListWithCapacity(baseIndex.getTablets().size());
+ TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Tablet baseTablet : baseIndex.getTablets()) {
- TabletMeta mvTabletMeta = new TabletMeta(
- dbId, tableId, partitionId, mvIndexId, mvSchemaHash,
medium);
long baseTabletId = baseTablet.getId();
long mvTabletId = idGeneratorBuffer.getNextId();
Tablet newTablet =
EnvFactory.getInstance().createTablet(mvTabletId);
- mvIndex.addTablet(newTablet, mvTabletMeta);
+ invertedIndex.addTablet(mvTabletId, mvTabletMeta);
+ mvTabletsForPartition.add(newTablet);
addedTablets.add(newTablet);
mvJob.addTabletIdMap(partitionId, mvTabletId, baseTabletId);
@@ -505,6 +513,9 @@ public class MaterializedViewHandler extends AlterHandler {
}
} // end for baseTablets
+ // Bulk-publish all MV tablets for this partition in one
copy-on-write.
+ mvIndex.appendTablets(mvTabletsForPartition);
+
mvJob.addMVIndex(partitionId, mvIndex);
if (LOG.isDebugEnabled()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 659908873ef..4f56b4aa94a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -54,6 +54,7 @@ import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.catalog.info.ColumnPosition;
import org.apache.doris.catalog.info.IndexType;
@@ -2050,14 +2051,23 @@ public class SchemaChangeHandler extends AlterHandler {
MaterializedIndex originIndex =
partition.getIndex(originIndexId);
ReplicaAllocation replicaAlloc =
olapTable.getPartitionInfo().getReplicaAllocation(partitionId);
Short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
+ // All shadow tablets of the same (partition, shadow index)
share the same TabletMeta;
+ // build it once and bulk-publish to MaterializedIndex.tablets
after the per-tablet
+ // loop to keep copy-on-write O(n). TabletInvertedIndex
registration stays
+ // per-iteration because Tablet.addReplica(...) below needs
the tablet present
+ // in the inverted index.
+ TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId,
partitionId, shadowIndexId,
+ newSchemaHash, medium);
+ List<Tablet> shadowTabletsForPartition =
Lists.newArrayListWithCapacity(
+ originIndex.getTablets().size());
+ TabletInvertedIndex invertedIndex =
Env.getCurrentInvertedIndex();
for (Tablet originTablet : originIndex.getTablets()) {
- TabletMeta shadowTabletMeta = new TabletMeta(dbId,
tableId, partitionId, shadowIndexId,
- newSchemaHash, medium);
long originTabletId = originTablet.getId();
long shadowTabletId = idGeneratorBuffer.getNextId();
Tablet shadowTablet =
EnvFactory.getInstance().createTablet(shadowTabletId);
- shadowIndex.addTablet(shadowTablet, shadowTabletMeta);
+ invertedIndex.addTablet(shadowTabletId, shadowTabletMeta);
+ shadowTabletsForPartition.add(shadowTablet);
addedTablets.add(shadowTablet);
schemaChangeJob.addTabletIdMap(partitionId, shadowIndexId,
shadowTabletId, originTabletId);
@@ -2115,6 +2125,9 @@ public class SchemaChangeHandler extends AlterHandler {
}
}
+ // Bulk-publish all shadow tablets for this partition in one
copy-on-write.
+ shadowIndex.appendTablets(shadowTabletsForPartition);
+
schemaChangeJob.addPartitionShadowIndex(partitionId,
shadowIndexId, shadowIndex);
} // end for partition
schemaChangeJob.addIndexSchema(shadowIndexId, originIndexId,
newIndexName, newSchemaVersion, newSchemaHash,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 528cbff5018..0aaa7140aaa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -109,6 +109,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -1553,12 +1554,13 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
int schemaHash =
remoteTbl.getSchemaHashByIndexId(remoteIdx.getId());
int remotetabletSize = remoteIdx.getTablets().size();
remoteIdx.clearTabletsForRestore();
+ // Collect locally and bulk-publish to keep copy-on-write O(n) for
the whole index.
+ List<Tablet> newTablets = new ArrayList<>(remotetabletSize);
for (int i = 0; i < remotetabletSize; i++) {
// generate new tablet id
long newTabletId = env.getNextId();
Tablet newTablet =
EnvFactory.getInstance().createTablet(newTabletId);
- // add tablet to index, but not add to TabletInvertedIndex
- remoteIdx.addTablet(newTablet, null /* tablet meta */, true /*
is restore */);
+ newTablets.add(newTablet);
// replicas
try {
Pair<Map<Tag, List<Long>>, TStorageMedium> beIdsAndMedium
= Env.getCurrentSystemInfo()
@@ -1577,6 +1579,9 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
return null;
}
}
+ // add tablets to index in one batch; TabletInvertedIndex
registration
+ // is intentionally skipped on the restore path (rebuilt
separately).
+ remoteIdx.appendTablets(newTablets);
}
return remotePart;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
index 6765a001492..118aa401227 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
@@ -366,6 +366,7 @@ public class CloudTabletStatMgr extends MasterDaemon {
long tabletIndexSize = 0L;
long tabletSegmentSize = 0L;
+ // getReplicas() returns an immutable volatile
snapshot; CME-safe under concurrent DDL.
for (Replica replica : tablet.getReplicas()) {
if (replica.getDataSize() >
tabletDataSize) {
tabletDataSize = replica.getDataSize();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java
index 14d9171f3ff..5db0a1286b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java
@@ -31,8 +31,8 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
-import java.util.Iterator;
import java.util.List;
import java.util.stream.LongStream;
@@ -40,7 +40,7 @@ public class LocalTablet extends Tablet {
private static final Logger LOG = LogManager.getLogger(LocalTablet.class);
@SerializedName(value = "rs", alternate = {"replicas"})
- private List<Replica> replicas;
+ private volatile List<Replica> replicas;
@SerializedName(value = "lastCheckTime")
private long lastCheckTime;
@@ -109,13 +109,14 @@ public class LocalTablet extends Tablet {
if (cooldownReplicaId <= 0) {
return 0;
}
- for (Replica r : replicas) {
+ List<Replica> snapshot = replicas; // single volatile read; reuse below
+ for (Replica r : snapshot) {
if (r.getId() == cooldownReplicaId) {
return r.getRemoteDataSize();
}
}
// return replica with max remoteDataSize
- return
replicas.stream().max(Comparator.comparing(Replica::getRemoteDataSize)).get().getRemoteDataSize();
+ return
snapshot.stream().max(Comparator.comparing(Replica::getRemoteDataSize)).get().getRemoteDataSize();
}
@Override
@@ -223,29 +224,32 @@ public class LocalTablet extends Tablet {
this.lastCheckTime = lastCheckTime;
}
- private boolean isLatestReplicaAndDeleteOld(Replica newReplica) {
- boolean delete = false;
+ // Writers are synchronized on this tablet to prevent concurrent
lost-update:
+ // some callers (e.g. InternalCatalog.createTablets, RestoreJob) do NOT
hold
+ // the OlapTable write lock when modifying replicas.
+ // Readers capture the volatile reference once and iterate freely — no
lock needed.
+
+ @Override
+ public synchronized void addReplica(Replica replica, boolean isRestore) {
+ long version = replica.getVersion();
+ long backendId = replica.getBackendIdWithoutException();
boolean hasBackend = false;
- long version = newReplica.getVersion();
- Iterator<Replica> iterator = replicas.iterator();
- while (iterator.hasNext()) {
- Replica replica = iterator.next();
- if (replica.getBackendIdWithoutException() ==
newReplica.getBackendIdWithoutException()) {
+ boolean deletedOld = false;
+ List<Replica> current = replicas;
+ List<Replica> next = new ArrayList<>(current.size() + 1);
+ for (Replica r : current) {
+ if (r.getBackendIdWithoutException() == backendId) {
hasBackend = true;
- if (replica.getVersion() <= version) {
- iterator.remove();
- delete = true;
+ if (r.getVersion() <= version) {
+ deletedOld = true;
+ continue; // drop stale replica
}
}
+ next.add(r);
}
-
- return delete || !hasBackend;
- }
-
- @Override
- public void addReplica(Replica replica, boolean isRestore) {
- if (isLatestReplicaAndDeleteOld(replica)) {
- replicas.add(replica);
+ if (deletedOld || !hasBackend) {
+ next.add(replica);
+ replicas = next; // volatile write; readers see the new immutable
snapshot
if (!isRestore) {
Env.getCurrentInvertedIndex().addReplica(id, replica);
}
@@ -254,12 +258,13 @@ public class LocalTablet extends Tablet {
@Override
public List<Replica> getReplicas() {
- return this.replicas;
+ // Volatile read: returns the current immutable snapshot; callers
iterate without locking.
+ return Collections.unmodifiableList(replicas);
}
@Override
public Replica getReplicaByBackendId(long backendId) {
- for (Replica replica : replicas) {
+ for (Replica replica : replicas) { // single volatile read
if (replica.getBackendIdWithoutException() == backendId) {
return replica;
}
@@ -268,9 +273,12 @@ public class LocalTablet extends Tablet {
}
@Override
- public boolean deleteReplica(Replica replica) {
- if (replicas.contains(replica)) {
- replicas.remove(replica);
+ public synchronized boolean deleteReplica(Replica replica) {
+ List<Replica> current = replicas;
+ if (current.contains(replica)) {
+ List<Replica> next = new ArrayList<>(current);
+ next.remove(replica);
+ replicas = next; // volatile write
Env.getCurrentInvertedIndex().deleteReplica(id,
replica.getBackendIdWithoutException());
return true;
}
@@ -278,16 +286,22 @@ public class LocalTablet extends Tablet {
}
@Override
- public boolean deleteReplicaByBackendId(long backendId) {
- Iterator<Replica> iterator = replicas.iterator();
- while (iterator.hasNext()) {
- Replica replica = iterator.next();
+ public synchronized boolean deleteReplicaByBackendId(long backendId) {
+ List<Replica> current = replicas;
+ List<Replica> next = new ArrayList<>(current.size());
+ Replica found = null;
+ for (Replica replica : current) {
if (replica.getBackendIdWithoutException() == backendId) {
- iterator.remove();
- Env.getCurrentInvertedIndex().deleteReplica(id, backendId);
- return true;
+ found = replica;
+ } else {
+ next.add(replica);
}
}
+ if (found != null) {
+ replicas = next; // volatile write
+ Env.getCurrentInvertedIndex().deleteReplica(id, backendId);
+ return true;
+ }
return false;
}
@@ -302,13 +316,17 @@ public class LocalTablet extends Tablet {
LocalTablet tablet = (LocalTablet) obj;
- if (replicas != tablet.replicas) {
- if (replicas.size() != tablet.replicas.size()) {
+ // Capture one snapshot per side so a concurrent writer cannot publish
+ // a different list between size/contains/get calls below.
+ List<Replica> thisReplicas = replicas;
+ List<Replica> otherReplicas = tablet.replicas;
+ if (thisReplicas != otherReplicas) {
+ if (thisReplicas.size() != otherReplicas.size()) {
return false;
}
- int size = replicas.size();
+ int size = thisReplicas.size();
for (int i = 0; i < size; i++) {
- if (!tablet.replicas.contains(replicas.get(i))) {
+ if (!otherReplicas.contains(thisReplicas.get(i))) {
return false;
}
}
@@ -334,7 +352,7 @@ public class LocalTablet extends Tablet {
}
boolean allBeAliveOrDecommissioned = true;
- for (Replica replica : replicas) {
+ for (Replica replica : replicas) { // single volatile read; iteration
on the snapshot
Backend backend =
infoService.getBackend(replica.getBackendIdWithoutException());
if (backend == null || (!backend.isAlive() &&
!backend.isDecommissioned())) {
allBeAliveOrDecommissioned = false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java
index 2e98354acef..f84ff018ef6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java
@@ -19,10 +19,11 @@ package org.apache.doris.catalog;
import org.apache.doris.persist.gson.GsonPostProcessable;
-import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -57,10 +58,16 @@ public class MaterializedIndex extends MetaObject
implements GsonPostProcessable
@SerializedName(value = "rowCount")
private long rowCount;
- private Map<Long, Tablet> idToTablets;
+ // Published as a volatile immutable snapshot in lockstep with `tablets`.
+ // Writers (synchronized) build a fresh HashMap and assign the field;
readers
+ // capture the reference once and call get/containsKey on the snapshot.
+ // Invariant: `tablets ⊆ idToTablets` — any tablet visible in the list is
also
+ // present in the map. This is preserved by publishing the map BEFORE the
list
+ // on add and the list BEFORE the map on clear.
+ private volatile Map<Long, Tablet> idToTablets;
@SerializedName(value = "tablets")
// this is for keeping tablet order
- private List<Tablet> tablets;
+ private volatile List<Tablet> tablets;
// for push after rollup index finished
@SerializedName(value = "rollupIndexId")
@@ -94,38 +101,78 @@ public class MaterializedIndex extends MetaObject
implements GsonPostProcessable
}
public List<Tablet> getTablets() {
- return tablets;
+ // Volatile read: returns the current immutable snapshot; callers
iterate without locking.
+ return Collections.unmodifiableList(tablets);
}
public List<Long> getTabletIdsInOrder() {
- List<Long> tabletIds = Lists.newArrayListWithCapacity(tablets.size());
- for (Tablet tablet : tablets) {
+ List<Tablet> snapshot = tablets; // single volatile read
+ List<Long> tabletIds = new ArrayList<>(snapshot.size());
+ for (Tablet tablet : snapshot) {
tabletIds.add(tablet.getId());
}
return tabletIds;
}
public Tablet getTablet(long tabletId) {
+ // Single volatile read of the immutable map snapshot.
return idToTablets.get(tabletId);
}
- public void clearTabletsForRestore() {
- idToTablets.clear();
- tablets.clear();
+ public synchronized void clearTabletsForRestore() {
+ // Drop the list first so iteration stops seeing tablets before
+ // lookup-by-id drops them. Maintains tablets ⊆ idToTablets.
+ tablets = new ArrayList<>();
+ idToTablets = new HashMap<>();
}
- public void addTablet(Tablet tablet, TabletMeta tabletMeta) {
+ public synchronized void addTablet(Tablet tablet, TabletMeta tabletMeta) {
addTablet(tablet, tabletMeta, false);
}
- public void addTablet(Tablet tablet, TabletMeta tabletMeta, boolean
isRestore) {
- idToTablets.put(tablet.getId(), tablet);
- tablets.add(tablet);
+ // Writers are synchronized on this index to prevent concurrent
lost-update:
+ // some callers (e.g. InternalCatalog.createTablets) do NOT hold the
OlapTable
+ // write lock when adding tablets.
+ // Copy-on-write keeps readers CME-safe without locking; for bulk creation
use
+ // appendTablets(...) so the per-index tablets list is copied once per
batch
+ // instead of once per tablet.
+ public synchronized void addTablet(Tablet tablet, TabletMeta tabletMeta,
boolean isRestore) {
+ appendTabletsInternal(Collections.singletonList(tablet));
if (!isRestore) {
Env.getCurrentInvertedIndex().addTablet(tablet.getId(),
tabletMeta);
}
}
+ // Bulk-publish: append the given tablets to this index's tablets list in a
+ // single copy-on-write (O(existing + batch) instead of O(n^2) over n
+ // single-tablet adds inside a synchronized block).
+ //
+ // Does NOT touch TabletInvertedIndex. Bulk-creation callers register
tablets
+ // in TabletInvertedIndex eagerly inside their per-tablet loop because
+ // Tablet.addReplica(...) (non-restore) requires the tablet to already be
+ // present in the inverted index; only the per-index list copy is expensive
+ // enough to be worth batching.
+ public synchronized void appendTablets(Collection<Tablet> newTablets) {
+ appendTabletsInternal(newTablets);
+ }
+
+ private void appendTabletsInternal(Collection<Tablet> newTablets) {
+ if (newTablets.isEmpty()) {
+ return;
+ }
+ Map<Long, Tablet> nextMap = new HashMap<>(idToTablets);
+ List<Tablet> nextList = new ArrayList<>(tablets.size() +
newTablets.size());
+ nextList.addAll(tablets);
+ for (Tablet tablet : newTablets) {
+ nextMap.put(tablet.getId(), tablet);
+ nextList.add(tablet);
+ }
+ // Publish the map first, then the list — so any id that appears in the
+ // visible `tablets` snapshot is already present in `idToTablets`.
+ idToTablets = nextMap;
+ tablets = nextList;
+ }
+
public void setIdForRestore(long idxId) {
this.id = idxId;
}
@@ -241,8 +288,9 @@ public class MaterializedIndex extends MetaObject
implements GsonPostProcessable
}
public int getTabletOrderIdx(long tabletId) {
+ List<Tablet> snapshot = tablets; // single volatile read
int idx = 0;
- for (Tablet tablet : tablets) {
+ for (Tablet tablet : snapshot) {
if (tablet.getId() == tabletId) {
return idx;
}
@@ -279,15 +327,16 @@ public class MaterializedIndex extends MetaObject
implements GsonPostProcessable
@Override
public String toString() {
+ List<Tablet> snapshot = tablets; // single volatile read
StringBuilder buffer = new StringBuilder();
buffer.append("index id: ").append(id).append("; ");
buffer.append("index state: ").append(state.name()).append("; ");
buffer.append("row count: ").append(rowCount).append("; ");
- buffer.append("tablets size: ").append(tablets.size()).append("; ");
+ buffer.append("tablets size: ").append(snapshot.size()).append("; ");
//
buffer.append("tablets: [");
- for (Tablet tablet : tablets) {
+ for (Tablet tablet : snapshot) {
buffer.append("tablet: ").append(tablet.toString()).append(", ");
}
buffer.append("]; ");
@@ -300,9 +349,13 @@ public class MaterializedIndex extends MetaObject
implements GsonPostProcessable
@Override
public void gsonPostProcess() {
- // build "idToTablets" from "tablets"
+ // Build a fresh "idToTablets" snapshot from the deserialized
"tablets" list.
+ // Runs single-threaded during gson deserialization, before any
concurrent
+ // reader can observe this object.
+ Map<Long, Tablet> map = new HashMap<>(tablets.size());
for (Tablet tablet : tablets) {
- idToTablets.put(tablet.getId(), tablet);
+ map.put(tablet.getId(), tablet);
}
+ idToTablets = map;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 2482a0514e4..38cd24c9e2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -972,10 +972,12 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
// generate new tablets in origin tablet order
int tabletNum = idx.getTablets().size();
idx.clearTabletsForRestore();
+ // Collect locally and bulk-publish to keep copy-on-write O(n)
for the whole index.
+ List<Tablet> newTablets = new ArrayList<>(tabletNum);
for (int i = 0; i < tabletNum; i++) {
long newTabletId = env.getNextId();
Tablet newTablet =
EnvFactory.getInstance().createTablet(newTabletId);
- idx.addTablet(newTablet, null /* tablet meta */, true /*
is restore */);
+ newTablets.add(newTablet);
// replicas
if (Config.isCloudMode()) {
long newReplicaId = Env.getCurrentEnv().getNextId();
@@ -1015,6 +1017,9 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
return new Status(ErrCode.COMMON_ERROR,
e.getMessage());
}
}
+ // add tablets to index in one batch; TabletInvertedIndex
registration
+ // is intentionally skipped on the restore path (rebuilt
separately).
+ idx.appendTablets(newTablets);
}
if (createNewColocateGroup) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
index bcf74528da6..ec99c8d45a9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
@@ -382,7 +382,14 @@ public class TabletStatMgr extends MasterDaemon {
if (result.isSetTabletStatList()) {
for (TTabletStat stat : result.getTabletStatList()) {
if (invertedIndex.getTabletMeta(stat.getTabletId()) != null) {
- Replica replica =
invertedIndex.getReplica(stat.getTabletId(), beId);
+ Replica replica;
+ try {
+ replica = invertedIndex.getReplica(stat.getTabletId(),
beId);
+ } catch (IllegalStateException e) {
+ LOG.debug("skip stale tablet stat update for tablet {}
on backend {}: {}",
+ stat.getTabletId(), beId, e.getMessage());
+ continue;
+ }
if (replica != null) {
replica.setDataSize(stat.getDataSize());
replica.setRemoteDataSize(stat.getRemoteDataSize());
@@ -411,7 +418,14 @@ public class TabletStatMgr extends MasterDaemon {
// the replica is obsolete, ignore it.
continue;
}
- Replica replica = invertedIndex.getReplica(entry.getKey(),
beId);
+ Replica replica;
+ try {
+ replica = invertedIndex.getReplica(entry.getKey(), beId);
+ } catch (IllegalStateException e) {
+ LOG.debug("skip stale tablet stat update for tablet {} on
backend {}: {}",
+ entry.getKey(), beId, e.getMessage());
+ continue;
+ }
if (replica == null) {
// replica may be deleted from catalog, ignore it.
continue;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java
index f47695adecf..a9e6469a6d9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java
@@ -333,18 +333,22 @@ public class CloudRestoreJob extends RestoreJob {
int schemaHash =
remoteTbl.getSchemaHashByIndexId(remoteIdx.getId());
int remotetabletSize = remoteIdx.getTablets().size();
remoteIdx.clearTabletsForRestore();
+ // Collect locally and bulk-publish to keep copy-on-write O(n) for
the whole index.
+ List<Tablet> newTablets = new ArrayList<>(remotetabletSize);
for (int i = 0; i < remotetabletSize; i++) {
// generate new tablet id
long newTabletId = env.getNextId();
Tablet newTablet =
EnvFactory.getInstance().createTablet(newTabletId);
- // add tablet to index, but not add to TabletInvertedIndex
- remoteIdx.addTablet(newTablet, null /* tablet meta */, true /*
is restore */);
+ newTablets.add(newTablet);
// replicas
long newReplicaId = Env.getCurrentEnv().getNextId();
Replica replica = new CloudReplica(newReplicaId, null,
Replica.ReplicaState.NORMAL,
visibleVersion, schemaHash, dbId, localTbl.getId(),
partitionId, remoteIdx.getId(), i);
newTablet.addReplica(replica, true /* is restore */);
}
+ // add tablets to index in one batch; TabletInvertedIndex
registration
+ // is intentionally skipped on the restore path (rebuilt
separately).
+ remoteIdx.appendTablets(newTablets);
}
return remotePart;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index 5bf096bc60e..cabc936772d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -41,6 +41,7 @@ import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.catalog.CloudPartition;
@@ -82,6 +83,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -448,11 +450,16 @@ public class CloudInternalCatalog extends InternalCatalog
{
private void createCloudTablets(MaterializedIndex index, ReplicaState
replicaState,
DistributionInfo distributionInfo, long version, ReplicaAllocation
replicaAlloc,
TabletMeta tabletMeta, Set<Long> tabletIdSet) throws DdlException {
+ // Collect bucket tablets locally and bulk-publish to the
MaterializedIndex's
+ // tablets list in a single copy-on-write after the loop (see
+ // InternalCatalog.createTablets for rationale).
+ TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+ List<Tablet> bucketTablets = new
ArrayList<>(distributionInfo.getBucketNum());
for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
Tablet tablet =
EnvFactory.getInstance().createTablet(Env.getCurrentEnv().getNextId());
- // add tablet to inverted index first
- index.addTablet(tablet, tabletMeta);
+ invertedIndex.addTablet(tablet.getId(), tabletMeta);
+ bucketTablets.add(tablet);
tabletIdSet.add(tablet.getId());
long replicaId = Env.getCurrentEnv().getNextId();
@@ -461,6 +468,7 @@ public class CloudInternalCatalog extends InternalCatalog {
tabletMeta.getPartitionId(), tabletMeta.getIndexId(), i);
tablet.addReplica(replica);
}
+ index.appendTablets(bucketTablets);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index a445fd31fd4..15f78c23ff4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -3419,12 +3419,19 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
}
+ // Collect bucket tablets locally and bulk-publish to the
MaterializedIndex's
+ // tablets list in a single copy-on-write after the loop (O(bucketNum)
instead
+ // of O(bucketNum^2)). TabletInvertedIndex registration stays
per-iteration
+ // because Tablet.addReplica(...) below needs the tablet present in the
+ // inverted index.
+ TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+ List<Tablet> bucketTablets = new
ArrayList<>(distributionInfo.getBucketNum());
for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
// create a new tablet with random chosen backends
Tablet tablet =
EnvFactory.getInstance().createTablet(idGeneratorBuffer.getNextId());
- // add tablet to inverted index first
- index.addTablet(tablet, tabletMeta);
+ invertedIndex.addTablet(tablet.getId(), tabletMeta);
+ bucketTablets.add(tablet);
tabletIdSet.add(tablet.getId());
// get BackendIds
@@ -3464,6 +3471,9 @@ public class InternalCatalog implements
CatalogIf<Database> {
totalReplicaNum + " vs. " +
replicaAlloc.getTotalReplicaNum());
}
+ // Publish all bucket tablets to the materialized index in one batch.
+ index.appendTablets(bucketTablets);
+
if (groupId != null && chooseBackendsArbitrary) {
colocateIndex.addBackendsPerBucketSeq(groupId,
backendsPerBucketSeq);
ColocatePersistInfo info =
ColocatePersistInfo.createForBackendsPerBucketSeq(groupId,
backendsPerBucketSeq);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java
index e177717d366..3ff3f2519a3 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.Text;
import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TStorageMedium;
import org.junit.After;
import org.junit.Assert;
@@ -35,6 +36,8 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
public class MaterializedIndexTest {
@@ -73,6 +76,68 @@ public class MaterializedIndexTest {
Assert.assertEquals(indexId, index.getId());
}
+ @Test
+ public void testGetTabletsReturnsImmutableSnapshot() {
+ TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1,
TStorageMedium.HDD);
+ index.addTablet(new LocalTablet(1L), tabletMeta, true);
+
+ List<Tablet> snapshot = index.getTablets();
+ Assert.assertEquals(1, snapshot.size());
+
+ // A write after the snapshot was taken must not be visible in it
(copy-on-write).
+ index.addTablet(new LocalTablet(2L), tabletMeta, true);
+ Assert.assertEquals(1, snapshot.size());
+ Assert.assertEquals(2, index.getTablets().size());
+
+ // The returned snapshot is read-only.
+ Assert.assertThrows(UnsupportedOperationException.class, () ->
snapshot.add(new LocalTablet(3L)));
+ }
+
+ @Test
+ public void testConcurrentGetTabletsNeverThrows() throws
InterruptedException {
+ // A reader repeatedly snapshots and iterates getTablets() while a
writer keeps
+ // adding tablets. Copy-on-write guarantees the reader never observes
a partially
+ // built list or throws ConcurrentModificationException.
+ TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1,
TStorageMedium.HDD);
+ AtomicReference<Throwable> error = new AtomicReference<>();
+ AtomicBoolean stop = new AtomicBoolean(false);
+
+ Thread writer = new Thread(() -> {
+ long id = 1000L;
+ while (!stop.get()) {
+ index.addTablet(new LocalTablet(id++), tabletMeta, true);
+ // Keep the list bounded (and exercise the clear path) so the
test stays fast.
+ if (index.getTablets().size() > 64) {
+ index.clearTabletsForRestore();
+ }
+ }
+ });
+
+ Thread reader = new Thread(() -> {
+ try {
+ for (int i = 0; i < 50000 && error.get() == null; i++) {
+ for (Tablet tablet : index.getTablets()) {
+ tablet.getId();
+ }
+ }
+ } catch (Throwable t) {
+ error.set(t);
+ } finally {
+ stop.set(true);
+ }
+ });
+
+ writer.start();
+ reader.start();
+ reader.join();
+ stop.set(true);
+ writer.join();
+
+ if (error.get() != null) {
+ Assert.fail("getTablets() iteration threw under concurrent
mutation: " + error.get());
+ }
+ }
+
@Test
public void testSerialization() throws Exception {
// 1. Write objects to file
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
index eaafb31000e..2f89dcdeea8 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
@@ -38,6 +38,9 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
public class TabletTest {
@@ -118,6 +121,82 @@ public class TabletTest {
Assert.assertEquals(1, tablet.getReplicas().size());
}
+ @Test
+ public void testGetReplicasReturnsImmutableSnapshot() {
+ List<Replica> snapshot = tablet.getReplicas();
+ Assert.assertEquals(3, snapshot.size());
+
+ // A write after the snapshot was taken must not be visible in it
(copy-on-write).
+ Replica replica4 = new LocalReplica(4L, 4L, 100L, 0, 200000L, 0,
3000L, ReplicaState.NORMAL, 0, 0);
+ tablet.addReplica(replica4);
+ Assert.assertEquals(3, snapshot.size());
+ Assert.assertEquals(4, tablet.getReplicas().size());
+
+ // The returned snapshot is read-only.
+ Assert.assertThrows(UnsupportedOperationException.class, () ->
snapshot.add(replica4));
+ }
+
+ @Test
+ public void testIterateReplicasWhileMutatingDoesNotThrow() {
+ // Iterating the snapshot returned by getReplicas() must not throw
+ // ConcurrentModificationException even when the tablet is
structurally modified
+ // during iteration.
+ int seen = 0;
+ for (Replica r : tablet.getReplicas()) {
+ Assert.assertNotNull(r);
+ tablet.addReplica(new LocalReplica(100L + seen, 100L + seen, 100L,
0, 200000L, 0, 3000L,
+ ReplicaState.NORMAL, 0, 0));
+ tablet.deleteReplicaByBackendId(2L);
+ seen++;
+ }
+ Assert.assertEquals(3, seen);
+ }
+
+ @Test
+ public void testConcurrentGetReplicasNeverThrows() throws
InterruptedException {
+ // A reader repeatedly snapshots and iterates getReplicas() while a
writer keeps
+ // mutating the replica list. Copy-on-write guarantees the reader
never observes a
+ // partially built list or throws ConcurrentModificationException.
+ AtomicReference<Throwable> error = new AtomicReference<>();
+ AtomicBoolean stop = new AtomicBoolean(false);
+
+ Thread writer = new Thread(() -> {
+ long id = 1000L;
+ while (!stop.get()) {
+ // Reuse a small set of backend ids so the list stays bounded
while still
+ // exercising the add/replace path.
+ long beId = id % 8;
+ tablet.addReplica(new LocalReplica(id, beId, 100L, 0, 200000L,
0, 3000L,
+ ReplicaState.NORMAL, 0, 0), true);
+ id++;
+ }
+ });
+
+ Thread reader = new Thread(() -> {
+ try {
+ for (int i = 0; i < 50000 && error.get() == null; i++) {
+ for (Replica r : tablet.getReplicas()) {
+ r.getId();
+ }
+ }
+ } catch (Throwable t) {
+ error.set(t);
+ } finally {
+ stop.set(true);
+ }
+ });
+
+ writer.start();
+ reader.start();
+ reader.join();
+ stop.set(true);
+ writer.join();
+
+ if (error.get() != null) {
+ Assert.fail("getReplicas() iteration threw under concurrent
mutation: " + error.get());
+ }
+ }
+
@Test
public void testSerialization() throws Exception {
final Path path = Files.createTempFile("olapTabletTest", "tmp");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]