This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d1e30df5565 [fix](cloud) Align colocate proc output and tablet health
in cloud mode (#60944)
d1e30df5565 is described below
commit d1e30df5565214d2180c5a89e69f76e2a50fba16
Author: deardeng <[email protected]>
AuthorDate: Mon Jun 1 11:14:21 2026 +0800
[fix](cloud) Align colocate proc output and tablet health in cloud mode
(#60944)
- Fix incorrect tablet health statistics in cloud mode for SHOW PROC
'/cluster_health/tablet_health': avoid reporting UNRECOVERABLE due to
local-mode health checks.
- Add cloud fallback for colocation group detail: when backend sequence
metadata is empty, derive backend ids from current tablets.
- In cloud mode, show ReplicaAllocation as null in SHOW PROC
'/colocation_group/{GroupId}' for consistent output semantics.
- Keep local mode behavior unchanged.
---
.../apache/doris/catalog/ColocateTableIndex.java | 19 +-
.../java/org/apache/doris/catalog/Replica.java | 21 ++
.../apache/doris/cloud/catalog/CloudReplica.java | 53 +++-
.../proc/ColocationGroupBackendSeqsProcNode.java | 24 +-
.../doris/common/proc/ColocationGroupProcDir.java | 158 ++++++++++-
.../doris/common/proc/TabletHealthProcDir.java | 7 +-
.../common/proc/CloudProcVersionDisplayTest.java | 16 +-
.../common/proc/ColocationGroupProcDirTest.java | 294 +++++++++++++++++++++
.../doris/common/proc/TabletHealthProcDirTest.java | 110 ++++++++
9 files changed, 679 insertions(+), 23 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
index 9fa53d06210..06d45ba763a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
@@ -437,6 +437,19 @@ public class ColocateTableIndex implements Writable {
}
}
+ public Long getDbIdByTblIdNullable(GroupId groupId, long tableId) {
+ readLock();
+ try {
+ GroupId tableGroupId = table2Group.get(tableId);
+ if (tableGroupId == null || !tableGroupId.equals(groupId)) {
+ return null;
+ }
+ return tableGroupId.tblId2DbId.get(tableId);
+ } finally {
+ readUnlock();
+ }
+ }
+
public Set<GroupId> getAllGroupIds() {
readLock();
try {
@@ -711,7 +724,11 @@ public class ColocateTableIndex implements Writable {
info.add(Joiner.on(", ").join(group2Tables.get(groupId)));
ColocateGroupSchema groupSchema = group2Schema.get(groupId);
info.add(String.valueOf(groupSchema.getBucketsNum()));
-
info.add(String.valueOf(groupSchema.getReplicaAlloc().toCreateStmt()));
+ if (Config.isCloudMode()) {
+ info.add("null");
+ } else {
+
info.add(String.valueOf(groupSchema.getReplicaAlloc().toCreateStmt()));
+ }
List<String> cols =
groupSchema.getDistributionColTypes().stream().map(
e -> e.toSql()).collect(Collectors.toList());
info.add(Joiner.on(", ").join(cols));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index da08e535438..5b1fb2ede26 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -29,6 +29,9 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
/**
* This class represents the olap replica related metadata.
@@ -179,6 +182,24 @@ public abstract class Replica {
return -1L;
}
+ public long getBackendIdForProcDisplay() {
+ return getBackendIdValue();
+ }
+
+ // For proc display only. Returns a "scope key -> backendId" mapping used
to render the
+ // replica's placement. In local deployment there is a single scope (empty
key) and the
+ // cache is unused; in cloud deployment each compute group is a separate
scope and the
+ // cache lets a single proc call fetch each compute group's backends once
(see
+ // CloudReplica). The cache is keyed by compute group id.
+ public Map<String, Long> getClusterToBackendForProcDisplay(Map<String,
List<Backend>> computeGroupBackendCache) {
+ Map<String, Long> result = new HashMap<>();
+ long backendId = getBackendIdForProcDisplay();
+ if (backendId != -1L) {
+ result.put("", backendId);
+ }
+ return result;
+ }
+
public void setBackendId(long backendId) {
if (backendId != -1) {
throw new UnsupportedOperationException("setBackendId is not
supported in Replica");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index 22f2464e48d..9568825cc4f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -44,6 +44,7 @@ import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -111,8 +112,17 @@ public class CloudReplica extends Replica implements
GsonPostProcessable {
}
public long getColocatedBeId(String clusterId) throws
ComputeGroupException {
+ List<Backend> clusterBackends = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getBackendsByClusterId(clusterId);
+ return getColocatedBeId(clusterId, clusterBackends);
+ }
+
+ // Same as getColocatedBeId(clusterId) but reuses an already fetched
backend list of
+ // the compute group. Lets callers that resolve many replicas across the
same compute
+ // groups (e.g. the colocate proc display) fetch each group's backends
only once.
+ public long getColocatedBeId(String clusterId, List<Backend>
clusterBackends) throws ComputeGroupException {
CloudSystemInfoService infoService = ((CloudSystemInfoService)
Env.getCurrentSystemInfo());
- List<Backend> bes =
infoService.getBackendsByClusterId(clusterId).stream()
+ List<Backend> bes = clusterBackends.stream()
.filter(be ->
be.isQueryAvailable()).collect(Collectors.toList());
String clusterName = infoService.getClusterNameByClusterId(clusterId);
if (bes.isEmpty()) {
@@ -216,6 +226,47 @@ public class CloudReplica extends Replica implements
GsonPostProcessable {
return primaryClusterToBackend.getOrDefault(clusterId, -1L);
}
+ // For proc display only. In cloud mode a replica is hashed to a different
BE in each
+ // compute group, so expose a clusterId -> backendId mapping; the proc
display builds
+ // a separate bucket sequence per compute group from it so each group's
sequence is
+ // self-consistent. Do not collapse this into a single BE (e.g. the first
one):
+ // backends differ across compute groups and would not match.
+ //
+ // ATTN: colocated replicas do NOT use primaryClusterToBackend (see
getBackendIdImpl /
+ // getClusterPrimaryBackendId, which short-circuit to getColocatedBeId),
so that cache
+ // is empty for them. Resolve their placement per compute group on the fly
instead.
+ // This reads CloudSystemInfoService / the colocate index, so callers must
invoke it
+ // OUTSIDE any table lock to avoid nested lock acquisition. It does not
auto-start any
+ // compute group: getColocatedBeId only reads the already-known backends
of a clusterId.
+ // computeGroupBackendCache maps compute group id ->
getBackendsByClusterId() result and
+ // is shared across all replicas resolved in a single proc call, so each
compute group's
+ // backend list is fetched only once instead of once per replica.
+ @Override
+ public Map<String, Long> getClusterToBackendForProcDisplay(Map<String,
List<Backend>> computeGroupBackendCache) {
+ if (!isColocated()) {
+ return new HashMap<>(primaryClusterToBackend);
+ }
+ Map<String, Long> result = new HashMap<>();
+ CloudSystemInfoService infoService = (CloudSystemInfoService)
Env.getCurrentSystemInfo();
+ for (String clusterId : infoService.getCloudClusterIds()) {
+ try {
+ List<Backend> clusterBackends =
+ computeGroupBackendCache.computeIfAbsent(clusterId,
infoService::getBackendsByClusterId);
+ long backendId = getColocatedBeId(clusterId, clusterBackends);
+ if (backendId != -1L) {
+ result.put(clusterId, backendId);
+ }
+ } catch (ComputeGroupException e) {
+ // Skip compute groups that currently have no available
backend.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("skip compute group {} for colocate proc
display, replica {}",
+ clusterId, getId(), e);
+ }
+ }
+ }
+ return result;
+ }
+
private String getCurrentClusterId() throws ComputeGroupException {
// Not in a connect session
String cluster = null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ColocationGroupBackendSeqsProcNode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ColocationGroupBackendSeqsProcNode.java
index 4ef502b1193..a2a7caeaa31 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ColocationGroupBackendSeqsProcNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ColocationGroupBackendSeqsProcNode.java
@@ -18,7 +18,6 @@
package org.apache.doris.common.proc;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.resource.Tag;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
@@ -30,10 +29,13 @@ import java.util.Map;
* show proc "/colocation_group/group_name";
*/
public class ColocationGroupBackendSeqsProcNode implements ProcNodeInterface {
- private Map<Tag, List<List<Long>>> backendsSeq;
+ // Column name -> per-bucket backend id sequence. The column name is a
resource Tag in
+ // local mode, a compute group name in cloud mode, or "BackendIds" when
there is no
+ // per-scope breakdown.
+ private Map<String, List<List<Long>>> backendsSeqByColumn;
- public ColocationGroupBackendSeqsProcNode(Map<Tag, List<List<Long>>>
backendsSeq) {
- this.backendsSeq = backendsSeq;
+ public ColocationGroupBackendSeqsProcNode(Map<String, List<List<Long>>>
backendsSeqByColumn) {
+ this.backendsSeqByColumn = backendsSeqByColumn;
}
@Override
@@ -42,21 +44,21 @@ public class ColocationGroupBackendSeqsProcNode implements
ProcNodeInterface {
List<String> titleNames = Lists.newArrayList();
titleNames.add("BucketIndex");
int bucketNum = 0;
- for (Tag tag : backendsSeq.keySet()) {
- titleNames.add(tag.toString());
+ for (String column : backendsSeqByColumn.keySet()) {
+ titleNames.add(column);
if (bucketNum == 0) {
- bucketNum = backendsSeq.get(tag).size();
- } else if (bucketNum != backendsSeq.get(tag).size()) {
+ bucketNum = backendsSeqByColumn.get(column).size();
+ } else if (bucketNum != backendsSeqByColumn.get(column).size()) {
throw new AnalysisException("Invalid bucket number: "
- + bucketNum + " vs. " + backendsSeq.get(tag).size());
+ + bucketNum + " vs. " +
backendsSeqByColumn.get(column).size());
}
}
result.setNames(titleNames);
for (int i = 0; i < bucketNum; i++) {
List<String> info = Lists.newArrayList();
info.add(String.valueOf(i)); // bucket index
- for (Tag tag : backendsSeq.keySet()) {
- List<List<Long>> bucketBackends = backendsSeq.get(tag);
+ for (String column : backendsSeqByColumn.keySet()) {
+ List<List<Long>> bucketBackends =
backendsSeqByColumn.get(column);
info.add(Joiner.on(", ").join(bucketBackends.get(i)));
}
result.addRow(info);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ColocationGroupProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ColocationGroupProcDir.java
index 85dd2c97be6..b2569c44919 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ColocationGroupProcDir.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ColocationGroupProcDir.java
@@ -19,14 +19,30 @@ package org.apache.doris.common.proc;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
+import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/*
* show proc "/colocation_group";
@@ -61,7 +77,22 @@ public class ColocationGroupProcDir implements
ProcDirInterface {
GroupId groupId = new GroupId(dbId, grpId);
ColocateTableIndex index = Env.getCurrentColocateIndex();
Map<Tag, List<List<Long>>> beSeqs =
index.getBackendsPerBucketSeq(groupId);
- return new ColocationGroupBackendSeqsProcNode(beSeqs);
+ Map<String, List<List<Long>>> columns;
+ if ((beSeqs == null || beSeqs.isEmpty()) && Config.isCloudMode()) {
+ // In cloud mode, legacy backend sequence metadata may be empty.
Derive the
+ // sequence from current tablets, one column per compute group.
This path must
+ // not resolve cloud backends in a way that auto-starts a compute
group.
+ columns = getCloudBackendSeqsFromTablets(groupId, index);
+ } else {
+ // Local mode: one column per resource tag.
+ columns = Maps.newLinkedHashMap();
+ if (beSeqs != null) {
+ for (Map.Entry<Tag, List<List<Long>>> entry :
beSeqs.entrySet()) {
+ columns.put(entry.getKey().toString(), entry.getValue());
+ }
+ }
+ }
+ return new ColocationGroupBackendSeqsProcNode(columns);
}
@Override
@@ -74,4 +105,129 @@ public class ColocationGroupProcDir implements
ProcDirInterface {
result.setRows(infos);
return result;
}
+
+ private Map<String, List<List<Long>>>
getCloudBackendSeqsFromTablets(GroupId groupId, ColocateTableIndex index) {
+ Map<String, List<List<Long>>> backendsSeq = Maps.newLinkedHashMap();
+ List<Long> tableIds = index.getAllTableIds(groupId);
+ for (Long tableId : tableIds) {
+ long dbId = groupId.dbId;
+ if (dbId == 0) {
+ Long tableDbId = index.getDbIdByTblIdNullable(groupId,
tableId);
+ if (tableDbId == null) {
+ continue;
+ }
+ dbId = tableDbId;
+ }
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ continue;
+ }
+ Table table = db.getTableNullable(tableId);
+ if (!(table instanceof OlapTable)) {
+ continue;
+ }
+ backendsSeq = getCloudBackendSeqsFromTable((OlapTable) table);
+ if (!backendsSeq.isEmpty()) {
+ return backendsSeq;
+ }
+ }
+ return backendsSeq;
+ }
+
+ private Map<String, List<List<Long>>>
getCloudBackendSeqsFromTable(OlapTable olapTable) {
+ // Snapshot replicas (ordered by bucket) under the table lock only.
Resolving the
+ // per-compute-group placement of colocate cloud replicas calls into
+ // CloudSystemInfoService / the colocate index, which must run outside
the table
+ // lock to avoid nested lock acquisition.
+ List<List<Replica>> bucketReplicas = Lists.newArrayList();
+ olapTable.readLock();
+ try {
+ Partition firstPartition = null;
+ for (Partition partition : olapTable.getAllPartitions()) {
+ firstPartition = partition;
+ break;
+ }
+ if (firstPartition == null) {
+ return Maps.newLinkedHashMap();
+ }
+ MaterializedIndex baseIndex = firstPartition.getBaseIndex();
+ for (Tablet tablet : baseIndex.getTablets()) {
+ bucketReplicas.add(new ArrayList<>(tablet.getReplicas()));
+ }
+ } finally {
+ olapTable.readUnlock();
+ }
+
+ // Resolve each replica's per-compute-group placement outside the
table lock. In
+ // cloud mode a replica is hashed to a different BE in each compute
group, so build
+ // a separate bucket sequence per compute group. Merging across groups
(picking an
+ // arbitrary first BE) would mix BEs from different compute groups
into one bucket
+ // sequence, which is meaningless. For colocate cloud tables placement
is computed
+ // on the fly; otherwise it comes from the cached clusterId ->
backendId map (or an
+ // empty scope key for local-style replicas).
+ List<List<Map<String, Long>>> tabletReplicaBackends =
Lists.newArrayListWithCapacity(bucketReplicas.size());
+ Set<String> scopeKeys = Sets.newLinkedHashSet();
+ // Shared across all replicas in this proc call so each compute
group's backend
+ // list is fetched only once (colocate placement is resolved per
compute group).
+ Map<String, List<Backend>> computeGroupBackendCache =
Maps.newHashMap();
+ for (List<Replica> replicas : bucketReplicas) {
+ List<Map<String, Long>> replicaBackends = new ArrayList<>();
+ for (Replica replica : replicas) {
+ Map<String, Long> clusterToBackend =
+
replica.getClusterToBackendForProcDisplay(computeGroupBackendCache);
+ replicaBackends.add(clusterToBackend);
+ scopeKeys.addAll(clusterToBackend.keySet());
+ }
+ tabletReplicaBackends.add(replicaBackends);
+ }
+
+ Map<String, List<List<Long>>> seqByScopeKey = Maps.newLinkedHashMap();
+ for (String scopeKey : scopeKeys) {
+ List<List<Long>> bucketSeq =
Lists.newArrayListWithCapacity(tabletReplicaBackends.size());
+ boolean hasBackend = false;
+ for (List<Map<String, Long>> replicaBackends :
tabletReplicaBackends) {
+ List<Long> bucketBackends = new ArrayList<>();
+ for (Map<String, Long> clusterToBackend : replicaBackends) {
+ Long backendId = clusterToBackend.get(scopeKey);
+ if (backendId == null || backendId < 0) {
+ continue;
+ }
+ bucketBackends.add(backendId);
+ hasBackend = true;
+ }
+ bucketSeq.add(bucketBackends);
+ }
+ if (hasBackend) {
+ seqByScopeKey.put(scopeKey, bucketSeq);
+ }
+ }
+
+ // Resolve scope keys to display column names (also outside the table
lock): name
+ // resolution acquires CloudSystemInfoService's lock.
+ Map<String, List<List<Long>>> backendsSeq = Maps.newLinkedHashMap();
+ for (Map.Entry<String, List<List<Long>>> entry :
seqByScopeKey.entrySet()) {
+ backendsSeq.put(scopeKeyToColumnName(entry.getKey()),
entry.getValue());
+ }
+ return backendsSeq;
+ }
+
+ // Map a proc-display scope key to its column name. An empty key means
there is no
+ // per-compute-group breakdown (local-style replicas), shown as a single
"BackendIds"
+ // column. Otherwise the key is a cloud compute group id, shown by its
compute group
+ // name (falling back to the raw id when the name cannot be resolved).
+ private String scopeKeyToColumnName(String scopeKey) {
+ if (Strings.isNullOrEmpty(scopeKey)) {
+ return "BackendIds";
+ }
+ try {
+ String name = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
+ .getClusterNameByClusterId(scopeKey);
+ if (!Strings.isNullOrEmpty(name)) {
+ return name;
+ }
+ } catch (Exception e) {
+ // Fall back to the raw compute group id if name resolution is
unavailable.
+ }
+ return scopeKey;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
index b1be9c79ab6..46c41dabebc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
@@ -199,7 +199,12 @@ public class TabletHealthProcDir implements
ProcDirInterface {
Tablet tablet = tablets.get(i);
++tabletNum;
Tablet.TabletStatus res = null;
- if (groupId != null) {
+ if (Config.isCloudMode()) {
+ // In cloud mode, tablet replica health is
managed by cloud components.
+ // getHealth/getColocateHealth follows
local deployment logic and may
+ // misclassify tablets as UNRECOVERABLE.
+ res = Tablet.TabletStatus.HEALTHY;
+ } else if (groupId != null) {
ColocateGroupSchema groupSchema =
colocateTableIndex.getGroupSchema(groupId);
if (groupSchema != null) {
replicaAlloc =
groupSchema.getReplicaAlloc();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/proc/CloudProcVersionDisplayTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/proc/CloudProcVersionDisplayTest.java
index 57c7df79b18..1957e565787 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/common/proc/CloudProcVersionDisplayTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/common/proc/CloudProcVersionDisplayTest.java
@@ -69,7 +69,7 @@ public class CloudProcVersionDisplayTest {
private SystemInfoService systemInfoService;
private TabletInvertedIndex invertedIndex;
private InternalCatalog internalCatalog;
- private MockedStatic<Env> envStatic;
+ private MockedStatic<Env> mockedEnv;
private String originDeployMode;
private String originCloudUniqueId;
@@ -92,15 +92,15 @@ public class CloudProcVersionDisplayTest {
Mockito.when(env.isReady()).thenReturn(true);
Mockito.when(systemInfoService.getAllBackendsByAllCluster()).thenReturn(ImmutableMap.of());
- envStatic = Mockito.mockStatic(Env.class, Mockito.CALLS_REAL_METHODS);
- envStatic.when(Env::getServingEnv).thenReturn(env);
-
envStatic.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService);
+ mockedEnv = Mockito.mockStatic(Env.class, Mockito.CALLS_REAL_METHODS);
+ mockedEnv.when(Env::getServingEnv).thenReturn(env);
+
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService);
}
@After
public void tearDown() {
- if (envStatic != null) {
- envStatic.close();
+ if (mockedEnv != null) {
+ mockedEnv.close();
}
Config.deploy_mode = originDeployMode;
Config.cloud_unique_id = originCloudUniqueId;
@@ -124,9 +124,9 @@ public class CloudProcVersionDisplayTest {
public void testReplicasProcNodeShowsPartitionCachedVersionInCloudMode()
throws AnalysisException {
ProcTestContext context = createProcTestContext();
- envStatic.when(Env::getCurrentInvertedIndex).thenReturn(invertedIndex);
-
envStatic.when(Env::getCurrentInternalCatalog).thenReturn(internalCatalog);
+ mockedEnv.when(Env::getCurrentInvertedIndex).thenReturn(invertedIndex);
Mockito.when(invertedIndex.getTabletMeta(TABLET_ID)).thenReturn(context.tabletMeta);
+
mockedEnv.when(Env::getCurrentInternalCatalog).thenReturn(internalCatalog);
Mockito.when(internalCatalog.getDbNullable(DB_ID)).thenReturn(context.db);
ReplicasProcNode procNode = new ReplicasProcNode(TABLET_ID,
context.tablet.getReplicas());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/proc/ColocationGroupProcDirTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/proc/ColocationGroupProcDirTest.java
new file mode 100644
index 00000000000..bfb53825947
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/common/proc/ColocationGroupProcDirTest.java
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.proc;
+
+import org.apache.doris.catalog.ColocateTableIndex;
+import org.apache.doris.catalog.ColocateTableIndex.GroupId;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Replica.ReplicaState;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.cloud.catalog.CloudReplica;
+import org.apache.doris.cloud.qe.ComputeGroupException;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.Config;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.util.List;
+import java.util.Map;
+
+public class ColocationGroupProcDirTest extends TestWithFeService {
+ private Database db;
+
+ @Override
+ protected int backendNum() {
+ return 1;
+ }
+
+ @Override
+ protected void runBeforeAll() throws Exception {
+ createDatabase("test");
+ useDatabase("test");
+ db = Env.getCurrentInternalCatalog().getDbOrMetaException("test");
+ }
+
+ @Override
+ protected void runBeforeEach() throws Exception {
+ for (Table table : db.getTables()) {
+ dropTable(table.getName(), true);
+ }
+ }
+
+ @Test
+ public void testLocalColocationGroupDetailKeepsTagColumns() throws
Exception {
+ Tag tag1 = Tag.create(Tag.TYPE_LOCATION, "tag1");
+ Tag tag2 = Tag.create(Tag.TYPE_LOCATION, "tag2");
+ Map<String, List<List<Long>>> backendsSeq = Maps.newLinkedHashMap();
+ backendsSeq.put(tag1.toString(), Lists.newArrayList(
+ Lists.newArrayList(10001L, 10002L),
+ Lists.newArrayList(10003L)));
+ backendsSeq.put(tag2.toString(), Lists.newArrayList(
+ Lists.newArrayList(20001L),
+ Lists.newArrayList(20002L, 20003L)));
+
+ ProcResult result = new
ColocationGroupBackendSeqsProcNode(backendsSeq).fetchResult();
+
+ Assertions.assertEquals(Lists.newArrayList("BucketIndex",
tag1.toString(), tag2.toString()),
+ result.getColumnNames());
+ Assertions.assertEquals(Lists.newArrayList("0", "10001, 10002",
"20001"), result.getRows().get(0));
+ Assertions.assertEquals(Lists.newArrayList("1", "10003", "20002,
20003"), result.getRows().get(1));
+ }
+
+ @Test
+ public void testCloudColocationGroupDetailWithoutTag() throws Exception {
+ String originDeployMode = Config.deploy_mode;
+ createTable("CREATE TABLE colocate_t1 (k INT) DISTRIBUTED BY HASH(k)
BUCKETS 2 "
+ + "PROPERTIES ('replication_num' = '1', 'colocate_with' =
'g1')");
+ createTable("CREATE TABLE colocate_t2 (k INT) DISTRIBUTED BY HASH(k)
BUCKETS 2 "
+ + "PROPERTIES ('replication_num' = '1', 'colocate_with' =
'g1')");
+
+ OlapTable table1 = (OlapTable)
db.getTableOrMetaException("colocate_t1");
+ GroupId groupId =
Env.getCurrentColocateIndex().getGroup(table1.getId());
+ Assertions.assertNotNull(groupId);
+
+ ColocateTableIndex colocateTableIndex =
Mockito.spy(Env.getCurrentColocateIndex());
+ Mockito.doReturn(Maps.<Tag,
List<List<Long>>>newHashMap()).when(colocateTableIndex)
+ .getBackendsPerBucketSeq(groupId);
+ Config.deploy_mode = "cloud";
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class,
Mockito.CALLS_REAL_METHODS)) {
+
mockedEnv.when(Env::getCurrentColocateIndex).thenReturn(colocateTableIndex);
+ ProcNodeInterface node = new
ColocationGroupProcDir().lookup(groupId.toString());
+ ProcResult result = node.fetchResult();
+ Assertions.assertEquals(Lists.newArrayList("BucketIndex",
"BackendIds"), result.getColumnNames());
+ Assertions.assertFalse(result.getRows().isEmpty());
+ Assertions.assertTrue(result.getRows().stream().anyMatch(row ->
row.size() == 2 && !row.get(1).isEmpty()));
+ } finally {
+ Config.deploy_mode = originDeployMode;
+ }
+ }
+
+ @Test
+ public void testCloudGlobalColocationGroupDetailFallback() throws
Exception {
+ String originDeployMode = Config.deploy_mode;
+ createTable("CREATE TABLE global_colocate_t1 (k INT) DISTRIBUTED BY
HASH(k) BUCKETS 2 "
+ + "PROPERTIES ('replication_num' = '1', 'colocate_with' =
'__global__g1')");
+
+ OlapTable table1 = (OlapTable)
db.getTableOrMetaException("global_colocate_t1");
+ GroupId groupId =
Env.getCurrentColocateIndex().getGroup(table1.getId());
+ Assertions.assertEquals(0L, groupId.dbId.longValue());
+
+ ColocateTableIndex colocateTableIndex =
Mockito.spy(Env.getCurrentColocateIndex());
+ Mockito.doReturn(Maps.<Tag,
List<List<Long>>>newHashMap()).when(colocateTableIndex)
+ .getBackendsPerBucketSeq(groupId);
+ Config.deploy_mode = "cloud";
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class,
Mockito.CALLS_REAL_METHODS)) {
+
mockedEnv.when(Env::getCurrentColocateIndex).thenReturn(colocateTableIndex);
+ ProcNodeInterface node = new
ColocationGroupProcDir().lookup(groupId.toString());
+ ProcResult result = node.fetchResult();
+ Assertions.assertEquals(Lists.newArrayList("BucketIndex",
"BackendIds"), result.getColumnNames());
+ Assertions.assertFalse(result.getRows().isEmpty());
+ Assertions.assertTrue(result.getRows().stream().anyMatch(row ->
row.size() == 2 && !row.get(1).isEmpty()));
+ } finally {
+ Config.deploy_mode = originDeployMode;
+ }
+ }
+
+ @Test
+ public void
testCloudColocationGroupDetailFallbackSkipsUnusableFirstTable() throws
Exception {
+ String originDeployMode = Config.deploy_mode;
+ createTable("CREATE TABLE colocate_t5 (k INT) DISTRIBUTED BY HASH(k)
BUCKETS 2 "
+ + "PROPERTIES ('replication_num' = '1', 'colocate_with' =
'g3')");
+ createTable("CREATE TABLE colocate_t6 (k INT) DISTRIBUTED BY HASH(k)
BUCKETS 2 "
+ + "PROPERTIES ('replication_num' = '1', 'colocate_with' =
'g3')");
+
+ OlapTable table1 = (OlapTable)
db.getTableOrMetaException("colocate_t5");
+ GroupId groupId =
Env.getCurrentColocateIndex().getGroup(table1.getId());
+ db.unregisterTable(table1.getId());
+
+ ColocateTableIndex colocateTableIndex =
Mockito.spy(Env.getCurrentColocateIndex());
+ Mockito.doReturn(Maps.<Tag,
List<List<Long>>>newHashMap()).when(colocateTableIndex)
+ .getBackendsPerBucketSeq(groupId);
+ Config.deploy_mode = "cloud";
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class,
Mockito.CALLS_REAL_METHODS)) {
+
mockedEnv.when(Env::getCurrentColocateIndex).thenReturn(colocateTableIndex);
+ ProcNodeInterface node = new
ColocationGroupProcDir().lookup(groupId.toString());
+ ProcResult result = node.fetchResult();
+ Assertions.assertEquals(Lists.newArrayList("BucketIndex",
"BackendIds"), result.getColumnNames());
+ Assertions.assertFalse(result.getRows().isEmpty());
+ Assertions.assertTrue(result.getRows().stream().anyMatch(row ->
row.size() == 2 && !row.get(1).isEmpty()));
+ } finally {
+ db.registerTable(table1);
+ Config.deploy_mode = originDeployMode;
+ }
+ }
+
+ @Test
+ public void testCloudColocationGroupReplicaAllocationIsNull() throws
Exception {
+ String originDeployMode = Config.deploy_mode;
+ createTable("CREATE TABLE colocate_t3 (k INT) DISTRIBUTED BY HASH(k)
BUCKETS 2 "
+ + "PROPERTIES ('replication_num' = '1', 'colocate_with' =
'g2')");
+ createTable("CREATE TABLE colocate_t4 (k INT) DISTRIBUTED BY HASH(k)
BUCKETS 2 "
+ + "PROPERTIES ('replication_num' = '1', 'colocate_with' =
'g2')");
+
+ Config.deploy_mode = "cloud";
+ try {
+ ProcResult result = new ColocationGroupProcDir().fetchResult();
+ int groupNameIdx =
ColocationGroupProcDir.TITLE_NAMES.indexOf("GroupName");
+ int replicaAllocIdx =
ColocationGroupProcDir.TITLE_NAMES.indexOf("ReplicaAllocation");
+ List<String> groupRow = result.getRows().stream()
+ .filter(row -> "test.g2".equals(row.get(groupNameIdx)))
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("can not find
colocate group test.g2"));
+ Assertions.assertEquals("null", groupRow.get(replicaAllocIdx));
+ } finally {
+ Config.deploy_mode = originDeployMode;
+ }
+ }
+
+ @Test
+ public void testCloudReplicaProcDisplayExposesPerComputeGroup() {
+ CloudReplica replica = new CloudReplica(1L, null, ReplicaState.NORMAL,
1L, 1,
+ db.getId(), 2L, 3L, 4L, 0L);
+
+
Assertions.assertTrue(replica.getClusterToBackendForProcDisplay(Maps.newHashMap()).isEmpty());
+
+ replica.updateClusterToPrimaryBe("cluster_a", 10001L);
+ replica.updateClusterToPrimaryBe("cluster_b", 20001L);
+
+ Map<String, Long> clusterToBackend =
replica.getClusterToBackendForProcDisplay(Maps.newHashMap());
+ Assertions.assertEquals(2, clusterToBackend.size());
+ Assertions.assertEquals(Long.valueOf(10001L),
clusterToBackend.get("cluster_a"));
+ Assertions.assertEquals(Long.valueOf(20001L),
clusterToBackend.get("cluster_b"));
+ }
+
+ @Test
+ public void testCloudColocatedReplicaProcDisplayResolvesPerComputeGroup()
throws Exception {
+ // A colocate cloud replica does not use primaryClusterToBackend
(placement is
+ // resolved on the fly via getColocatedBeId), so that cache stays
empty. The proc
+ // display must still produce one backend per compute group instead of
nothing.
+ CloudReplica replica = Mockito.spy(new CloudReplica(1L, null,
ReplicaState.NORMAL, 1L, 1,
+ db.getId(), 2L, 3L, 4L, 0L));
+
+ ColocateTableIndex colocateTableIndex =
Mockito.mock(ColocateTableIndex.class);
+
Mockito.when(colocateTableIndex.isColocateTableNoLock(2L)).thenReturn(true);
+ CloudSystemInfoService infoService =
Mockito.mock(CloudSystemInfoService.class);
+
Mockito.when(infoService.getCloudClusterIds()).thenReturn(Lists.newArrayList("cg_a",
"cg_b"));
+ // Backend list is resolved through the per-clusterId cache; return a
non-null list
+ // so the cache stores it (placement itself is stubbed on
getColocatedBeId below).
+
Mockito.when(infoService.getBackendsByClusterId(Mockito.anyString())).thenReturn(Lists.newArrayList());
+
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class,
Mockito.CALLS_REAL_METHODS)) {
+
mockedEnv.when(Env::getCurrentColocateIndex).thenReturn(colocateTableIndex);
+ mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(infoService);
+
Mockito.doReturn(10001L).when(replica).getColocatedBeId(Mockito.eq("cg_a"),
Mockito.any());
+ // cg_b currently has no available backend; it must be skipped,
not fail.
+ Mockito.doThrow(new ComputeGroupException("no alive be",
+
ComputeGroupException.FailedTypeEnum.COMPUTE_GROUPS_NO_ALIVE_BE))
+ .when(replica).getColocatedBeId(Mockito.eq("cg_b"),
Mockito.any());
+
+ Map<String, Long> clusterToBackend =
+
replica.getClusterToBackendForProcDisplay(Maps.newHashMap());
+ Assertions.assertEquals(1, clusterToBackend.size());
+ Assertions.assertEquals(Long.valueOf(10001L),
clusterToBackend.get("cg_a"));
+ Assertions.assertFalse(clusterToBackend.containsKey("cg_b"));
+ }
+ }
+
+ @Test
+ public void testColocateProcDisplayCachesBackendsPerComputeGroup() throws
Exception {
+ // A shared cache makes a single proc call fetch each compute group's
backend list
+ // only once, even when many replicas resolve placement across the
same groups.
+ CloudReplica replica1 = Mockito.spy(new CloudReplica(1L, null,
ReplicaState.NORMAL, 1L, 1,
+ db.getId(), 2L, 3L, 4L, 0L));
+ CloudReplica replica2 = Mockito.spy(new CloudReplica(2L, null,
ReplicaState.NORMAL, 1L, 1,
+ db.getId(), 2L, 3L, 4L, 1L));
+
+ ColocateTableIndex colocateTableIndex =
Mockito.mock(ColocateTableIndex.class);
+
Mockito.when(colocateTableIndex.isColocateTableNoLock(2L)).thenReturn(true);
+ CloudSystemInfoService infoService =
Mockito.mock(CloudSystemInfoService.class);
+
Mockito.when(infoService.getCloudClusterIds()).thenReturn(Lists.newArrayList("cg_a"));
+
Mockito.when(infoService.getBackendsByClusterId(Mockito.anyString())).thenReturn(Lists.newArrayList());
+
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class,
Mockito.CALLS_REAL_METHODS)) {
+
mockedEnv.when(Env::getCurrentColocateIndex).thenReturn(colocateTableIndex);
+ mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(infoService);
+
Mockito.doReturn(10001L).when(replica1).getColocatedBeId(Mockito.eq("cg_a"),
Mockito.any());
+
Mockito.doReturn(10002L).when(replica2).getColocatedBeId(Mockito.eq("cg_a"),
Mockito.any());
+
+ Map<String, List<Backend>> computeGroupBackendCache =
Maps.newHashMap();
+ Assertions.assertEquals(Long.valueOf(10001L),
+
replica1.getClusterToBackendForProcDisplay(computeGroupBackendCache).get("cg_a"));
+ Assertions.assertEquals(Long.valueOf(10002L),
+
replica2.getClusterToBackendForProcDisplay(computeGroupBackendCache).get("cg_a"));
+
+ // Fetched once for cg_a despite two replicas resolving against it.
+ Mockito.verify(infoService,
Mockito.times(1)).getBackendsByClusterId("cg_a");
+ }
+ }
+
+ @Test
+ public void testColocationGroupDetailPerComputeGroupColumns() throws
Exception {
+ // Two compute groups each get their own column (named by the compute
group), and
+ // within a compute group the per-bucket backend sequence is
self-consistent (not
+ // mixed across groups).
+ Map<String, List<List<Long>>> backendsSeq = Maps.newLinkedHashMap();
+ backendsSeq.put("cg_a", Lists.newArrayList(
+ Lists.newArrayList(10001L),
+ Lists.newArrayList(10002L)));
+ backendsSeq.put("cg_b", Lists.newArrayList(
+ Lists.newArrayList(20001L),
+ Lists.newArrayList(20002L)));
+
+ ProcResult result = new
ColocationGroupBackendSeqsProcNode(backendsSeq).fetchResult();
+
+ Assertions.assertEquals(Lists.newArrayList("BucketIndex", "cg_a",
"cg_b"),
+ result.getColumnNames());
+ Assertions.assertEquals(Lists.newArrayList("0", "10001", "20001"),
result.getRows().get(0));
+ Assertions.assertEquals(Lists.newArrayList("1", "10002", "20002"),
result.getRows().get(1));
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/proc/TabletHealthProcDirTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/proc/TabletHealthProcDirTest.java
new file mode 100644
index 00000000000..bd15b33930e
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/common/proc/TabletHealthProcDirTest.java
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.proc;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Tablet.TabletStatus;
+import org.apache.doris.common.Config;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class TabletHealthProcDirTest extends TestWithFeService {
+ private Database db;
+
+ @Override
+ protected int backendNum() {
+ return 1;
+ }
+
+ @Override
+ protected void runBeforeAll() throws Exception {
+ createDatabase("test");
+ useDatabase("test");
+ db = Env.getCurrentInternalCatalog().getDbOrMetaException("test");
+ }
+
+ @Override
+ protected void runBeforeEach() throws Exception {
+ for (Table table : db.getTables()) {
+ dropTable(table.getName(), true);
+ }
+ }
+
+ @Test
+ public void testCloudModeCountsTabletAsHealthy() throws Exception {
+ String originDeployMode = Config.deploy_mode;
+ createTable("CREATE TABLE tbl_proc_health (k INT) DISTRIBUTED BY
HASH(k) BUCKETS 1"
+ + " PROPERTIES ('replication_num' = '1')");
+
+ OlapTable table = (OlapTable)
db.getTableOrMetaException("tbl_proc_health");
+ Partition partition = table.getPartitions().iterator().next();
+ Tablet tablet =
partition.getMaterializedIndices(IndexExtState.ALL).iterator().next()
+ .getTablets().iterator().next();
+
+ partition.updateVisibleVersion(10L);
+ Replica replica = tablet.getReplicas().get(0);
+ replica.updateVersion(10L);
+ replica.setBad(true);
+
+ TabletStatus localStatus =
tablet.getHealth(Env.getCurrentSystemInfo(), partition.getVisibleVersion(),
+
table.getPartitionInfo().getReplicaAllocation(partition.getId()),
+ Env.getCurrentSystemInfo().getAllBackendIds(true)).status;
+ Assertions.assertEquals(TabletStatus.UNRECOVERABLE, localStatus);
+
+ Config.deploy_mode = "cloud";
+ try (MockedStatic<Partition> mockedPartition =
Mockito.mockStatic(Partition.class, Mockito.CALLS_REAL_METHODS)) {
+ mockedPartition.when(() ->
Partition.getVisibleVersions(Mockito.anyList())).thenAnswer(invocation -> {
+ List<? extends Partition> partitions =
invocation.getArgument(0);
+ return
partitions.stream().map(Partition::getVisibleVersion).collect(Collectors.toList());
+ });
+ TabletHealthProcDir.DBTabletStatistic statistic = new
TabletHealthProcDir.DBTabletStatistic(db);
+ List<String> row = statistic.toRow();
+ int tabletNumIdx =
TabletHealthProcDir.TITLE_NAMES.indexOf("TabletNum");
+ int healthyNumIdx =
TabletHealthProcDir.TITLE_NAMES.indexOf("HealthyNum");
+ int unrecoverableNumIdx =
TabletHealthProcDir.TITLE_NAMES.indexOf("UnrecoverableNum");
+
+ Assertions.assertEquals("1", row.get(tabletNumIdx));
+ Assertions.assertEquals("1", row.get(healthyNumIdx));
+ Assertions.assertEquals("0", row.get(unrecoverableNumIdx));
+ } finally {
+ Config.deploy_mode = originDeployMode;
+ }
+ }
+
+ private List<String> findDbRow(ProcResult result, long dbId) {
+ int dbIdIdx = TabletHealthProcDir.TITLE_NAMES.indexOf("DbId");
+ return result.getRows().stream()
+ .filter(row -> String.valueOf(dbId).equals(row.get(dbIdIdx)))
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("can not find db row in
tablet health proc result"));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]