This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f7eac4e7401 [opt](cloud) cache cluster id per query and drop redundant
locks on getBackendId hot path (#63636)
f7eac4e7401 is described below
commit f7eac4e74018439d85f0cdae9d5e0177dd39d6f6
Author: Xin Liao <[email protected]>
AuthorDate: Mon Jun 8 19:58:58 2026 +0800
[opt](cloud) cache cluster id per query and drop redundant locks on
getBackendId hot path (#63636)
`CloudReplica.getBackendId` was the dominant FE hotspot in cloud-mode
query/load planning. On a profiled query it accounted for **65.6% of
total FE CPU samples** (async-profiler). Every replica in the plan
re-ran the full cluster-id resolution pipeline even though the resolved
cluster id is identical for every tablet in the same request.
Call breakdown from the flame graph:
```
Tablet.getNormalReplicaBackendPathMap 68.6%
└─ CloudReplica.getBackendId 65.6%
└─ getCurrentClusterId 61.5%
├─ getCloudClusterIdByName 35.6%
│ ├─ getCloudClusterNames 24.7% (rw-lock +
ArrayList + stream.sorted)
│ └─ waitForAutoStart 9.7%
(StopWatch.start/stop even on NORMAL)
├─ getPhysicalCluster 14.5%
│ └─ getComputeGroupByName 14.5% (rw-lock around
ConcurrentHashMap)
└─ getCloudStatusByName 8.3%
└─ getCloudStatusByIdNoLock 14.0% (two stream
passes, String.valueOf per BE)
```
`ReadLock.unlock` consistently outweighed `ReadLock.lock` in the profile
-- a classic cache-line bouncing signature, meaning the read-lock CAS
was already operating in the non-linear regime where adding concurrent
threads disproportionately hurts throughput. With high tablet counts the
call rate (`tablets × replicas × concurrent_queries`) easily reached
100k+ lock/unlock per second per FE, putting the FE one nudge away from
a metastable collapse.
### Changes
- **`CloudReplica.getCurrentClusterId`** becomes `public static` so
callers can resolve once per request. Adds
`getBackendIdWithClusterId(String)` that bypasses the per-replica
pipeline.
- **`OlapTableSink.createLocation`** and
**`FrontendServiceImpl.{createPartition, replacePartition}`** resolve
the cluster id once before iterating tablets and pass it down via the
new `CloudTablet.getNormalReplicaBackendPathMap(String)` overload. For a
10k-tablet query this collapses 10k full pipelines into 1.
- **`CloudSystemInfoService.getComputeGroupByName`**: drop the rw-lock
around `ConcurrentHashMap` reads, merge `containsKey+get` into a single
`get`, guard the debug log behind `isDebugEnabled` (the map `toString`
is expensive).
- **`CloudSystemInfoService.containsCloudCluster`** (new): cheap
existence check that replaces `getCloudClusterNames().contains(name)`.
Avoids an ArrayList copy + stream filter + natural sort + collect under
a read lock for a single existence query.
- **`CloudSystemInfoService.getCloudStatusByIdNoLock`**: single-pass
loop with a precomputed `NORMAL` constant in place of two stream
pipelines that re-evaluated `String.valueOf(NORMAL)` per backend.
- **`CloudSystemInfoService.waitForAutoStart`**: fast-path return when
the cluster is already `NORMAL`, skipping the
`withTemporaryNereidsTimeout` wrap and the `StopWatch.start/stop` inside
`waitForClusterToResume` (the while loop never executed in that state
but the wrapping still ran per call -- ~3% of total FE CPU in the
profile).
### Behavior
Semantics preserved on all paths:
- `waitForAutoStart` NORMAL fast-path: the original code already had
`existAliveBe = true` as initializer, so `waitForClusterToResume` was a
no-op for NORMAL clusters.
- `getComputeGroupByName` without rw-lock: the brief window between a
rename's two map updates can now return `null`; same as the existing
read-only paths everywhere else in this class that already access these
maps without locks.
- `containsCloudCluster` matches `getCloudClusterNames().contains(name)`
for non-empty `name` (empty-name filtering in `getCloudClusterNames` was
for the returned list, not for `.contains` semantics).
### Further comments
The cluster id resolution is hoisted only on the no-BE-endpoint paths
(the hot ones in the profile). The endpoint-resolved path in
`FrontendServiceImpl` already has its own resolution logic and is
untouched.
---
.../java/org/apache/doris/alter/AlterJobV2.java | 2 +-
.../apache/doris/cloud/catalog/CloudReplica.java | 125 +++++----------------
.../apache/doris/cloud/catalog/CloudTablet.java | 15 ++-
.../doris/cloud/system/CloudSystemInfoService.java | 93 ++++++++++++++-
.../org/apache/doris/planner/OlapScanNode.java | 26 ++++-
.../org/apache/doris/planner/OlapTableSink.java | 17 ++-
.../apache/doris/service/FrontendServiceImpl.java | 35 +++++-
.../cloud/system/CloudSystemInfoServiceTest.java | 15 +++
8 files changed, 213 insertions(+), 115 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
index cab36e2eacf..142ef24f7e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
@@ -255,7 +255,7 @@ public abstract class AlterJobV2 implements Writable {
ConnectContext ctx = new ConnectContext();
ctx.setThreadLocalInfo();
ctx.setCloudCluster(cloudClusterName);
- // currently used for CloudReplica.getCurrentClusterId
+ // currently used for CloudSystemInfoService.getCurrentClusterId
// later maybe used for managing all workload in BE.
ctx.setCurrentUserIdentity(this.userIdentity);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index 9568825cc4f..0924406c50b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -17,20 +17,17 @@
package org.apache.doris.cloud.catalog;
-import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Replica;
-import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
-import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.persist.gson.GsonPostProcessable;
-import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
import com.google.common.base.Strings;
import com.google.common.hash.HashCode;
@@ -38,7 +35,6 @@ import com.google.common.hash.Hashing;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
-import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -180,20 +176,25 @@ public class CloudReplica extends Replica implements
GsonPostProcessable {
@Override
public long getBackendId() throws ComputeGroupException {
- return getBackendIdImpl(getCurrentClusterId());
+ return getBackendIdImpl(cloudInfoService().getCurrentClusterId());
}
- public long getBackendId(String beEndpoint) {
- String clusterName = ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getClusterNameByBeAddr(beEndpoint);
- String physicalClusterName = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
- .getPhysicalCluster(clusterName);
+ // Variant for callers that have already resolved the cluster id once per
request
+ // and want to skip the per-replica
ConnectContext/priv/status/autoStart/existence pipeline.
+ public long getBackendIdWithClusterId(String clusterId) throws
ComputeGroupException {
+ return getBackendIdImpl(clusterId);
+ }
+ public long getBackendId(String beEndpoint) {
try {
- String clusterId = getCloudClusterIdByName(physicalClusterName);
+ CloudSystemInfoService infoService = cloudInfoService();
+ String clusterName =
infoService.getClusterNameByBeAddr(beEndpoint);
+ String physicalClusterName =
infoService.getPhysicalCluster(clusterName);
+ String clusterId =
infoService.resolveClusterIdByName(physicalClusterName);
return getBackendIdImpl(clusterId);
} catch (ComputeGroupException e) {
if (LOG.isDebugEnabled()) {
- LOG.debug("failed to get compute group name {}",
physicalClusterName, e);
+ LOG.debug("failed to get compute group name for endpoint {}",
beEndpoint, e);
}
return -1;
}
@@ -202,7 +203,7 @@ public class CloudReplica extends Replica implements
GsonPostProcessable {
public long getPrimaryBackendId() {
String clusterId;
try {
- clusterId = getCurrentClusterId();
+ clusterId = cloudInfoService().getCurrentClusterId();
} catch (ComputeGroupException e) {
return -1L;
}
@@ -214,6 +215,19 @@ public class CloudReplica extends Replica implements
GsonPostProcessable {
return getClusterPrimaryBackendId(clusterId);
}
+ // Returns the CloudSystemInfoService instance, or throws
ComputeGroupException when
+ // Env was set up with a base SystemInfoService (typically a unit test
that mocks the
+ // base type). Production cloud-mode FE never hits the throw branch.
+ private static CloudSystemInfoService cloudInfoService() throws
ComputeGroupException {
+ SystemInfoService info = Env.getCurrentSystemInfo();
+ if (info instanceof CloudSystemInfoService) {
+ return (CloudSystemInfoService) info;
+ }
+ throw new ComputeGroupException(
+ "current system info service is not cloud-aware",
+ ComputeGroupException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET);
+ }
+
public long getClusterPrimaryBackendId(String clusterId) {
if (isColocated()) {
try {
@@ -267,91 +281,6 @@ public class CloudReplica extends Replica implements
GsonPostProcessable {
return result;
}
- private String getCurrentClusterId() throws ComputeGroupException {
- // Not in a connect session
- String cluster = null;
- ConnectContext context = ConnectContext.get();
- if (context != null) {
- // TODO(wb) rethinking whether should update err status.
- cluster = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
- .getPhysicalCluster(context.getCloudCluster());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("get compute group by context {}", cluster);
- }
-
- UserIdentity currentUid = context.getCurrentUserIdentity();
- if (currentUid != null &&
!StringUtils.isEmpty(currentUid.getQualifiedUser())) {
- try {
- ((CloudEnv)
Env.getCurrentEnv()).checkCloudClusterPriv(cluster);
- } catch (Exception e) {
- LOG.warn("check compute group {} for {} auth failed.",
cluster,
- context.getCurrentUserIdentity().toString());
- throw new ComputeGroupException(
- String.format("context compute group %s check auth
failed, user is %s",
- cluster,
context.getCurrentUserIdentity().toString()),
-
ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_DEFAULT_COMPUTE_GROUP);
- }
- } else {
- LOG.info("connect context user is null.");
- throw new ComputeGroupException("connect context's user is
null",
-
ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_DEFAULT_COMPUTE_GROUP);
- }
-
- String clusterStatus = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
- .getCloudStatusByName(cluster);
- if (!Strings.isNullOrEmpty(clusterStatus)
- && Cloud.ClusterStatus.valueOf(clusterStatus)
- == Cloud.ClusterStatus.MANUAL_SHUTDOWN) {
- LOG.warn("auto start compute group {} in manual shutdown
status", cluster);
- throw new ComputeGroupException(
- String.format("The current compute group %s has been
manually shutdown", cluster),
-
ComputeGroupException.FailedTypeEnum.CURRENT_COMPUTE_GROUP_BEEN_MANUAL_SHUTDOWN);
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("connect context is null in getBackendId");
- }
- throw new ComputeGroupException("connect context not set cluster ",
-
ComputeGroupException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET);
- }
-
- return getCloudClusterIdByName(cluster);
- }
-
- private String getCloudClusterIdByName(String cluster) throws
ComputeGroupException {
- // if cluster is SUSPENDED, wait
- String wakeUPCluster = "";
- try {
- wakeUPCluster = ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).waitForAutoStart(cluster);
- } catch (DdlException e) {
- // this function cant throw exception. so just log it
- LOG.warn("cant resume compute group {}, exception", cluster, e);
- }
- if (!Strings.isNullOrEmpty(wakeUPCluster) &&
!cluster.equals(wakeUPCluster)) {
- cluster = wakeUPCluster;
- LOG.warn("get backend input compute group {} useless, so auto
start choose a new one compute group {}",
- cluster, wakeUPCluster);
- }
- // check default compute group valid.
- if (Strings.isNullOrEmpty(cluster)) {
- LOG.warn("failed to get available be, clusterName: {}", cluster);
- throw new ComputeGroupException("compute group name is empty",
-
ComputeGroupException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET_COMPUTE_GROUP);
- }
- boolean exist = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
- .getCloudClusterNames().contains(cluster);
- if (!exist) {
- // can't use this default compute group, plz change another
- LOG.warn("compute group: {} is not existed", cluster);
- throw new ComputeGroupException(
- String.format("The current compute group %s is not registered
in the system", cluster),
-
ComputeGroupException.FailedTypeEnum.CURRENT_COMPUTE_GROUP_NOT_EXIST);
- }
-
- return ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getCloudClusterIdByName(cluster);
- }
-
private long getBackendIdImpl(String clusterId) throws
ComputeGroupException {
if (Strings.isNullOrEmpty(clusterId)) {
return -1L;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
index c1a3ed71290..541570b079e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletSlidingWindowAccessStats;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.persist.gson.GsonPostProcessable;
@@ -74,7 +75,19 @@ public class CloudTablet extends Tablet implements
GsonPostProcessable {
@Override
public Multimap<Long, Long> getNormalReplicaBackendPathMap() throws
UserException {
- Multimap<Long, Long> pathMap = super.getNormalReplicaBackendPathMap();
+ // Per-tablet entry point: resolves cluster id here for callers that
don't hoist it.
+ // High-tablet-count callers should resolve once and pass the cluster
id in via
+ // getNormalReplicaBackendPathMapByClusterId to amortize the
resolution.
+ String clusterId = ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getCurrentClusterId();
+ return getNormalReplicaBackendPathMapByClusterId(clusterId);
+ }
+
+ // Cluster id is supplied by the caller (resolved lazily once per request),
+ // bypassing the per-replica
ConnectContext/priv/status/autoStart/existence pipeline.
+ public Multimap<Long, Long>
getNormalReplicaBackendPathMapByClusterId(String clusterId) throws
UserException {
+ TabletSlidingWindowAccessStats.recordTablet(getId());
+ Multimap<Long, Long> pathMap =
super.getNormalReplicaBackendPathMapImpl(null,
+ (rep, be) -> ((CloudReplica)
rep).getBackendIdWithClusterId(clusterId));
return backendPathMapReprocess(pathMap);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 41ff4d87fa2..9d960df6507 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -17,6 +17,7 @@
package org.apache.doris.cloud.system;
+import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.cloud.catalog.CloudEnv;
@@ -118,18 +119,100 @@ public class CloudSystemInfoService extends
SystemInfoService {
}
public ComputeGroup getComputeGroupByName(String computeGroupName) {
- LOG.debug("get id {} computeGroupIdToComputeGroup : {} ",
computeGroupName, computeGroupIdToComputeGroup);
+ // rlock guards the compound name->id->group lookup: writers
(add/remove/rename)
+ // update both maps under wlock, and the read must observe a
consistent snapshot
+ // so callers like getPhysicalCluster don't transiently see a virtual
group name
+ // with a null group and fall back to treating it as a physical
cluster.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get id {} computeGroupIdToComputeGroup : {} ",
computeGroupName, computeGroupIdToComputeGroup);
+ }
try {
rlock.lock();
- if (!clusterNameToId.containsKey(computeGroupName)) {
- return null;
- }
- return
computeGroupIdToComputeGroup.get(clusterNameToId.get(computeGroupName));
+ String id = clusterNameToId.get(computeGroupName);
+ return id == null ? null : computeGroupIdToComputeGroup.get(id);
} finally {
rlock.unlock();
}
}
+ public boolean containsCloudCluster(String clusterName) {
+ return !Strings.isNullOrEmpty(clusterName) &&
clusterNameToId.containsKey(clusterName);
+ }
+
+ // Resolve the cluster id for the current ConnectContext: physical-cluster
lookup,
+ // priv check, status check (reject MANUAL_SHUTDOWN), wait-for-autoStart,
existence
+ // check, finally name->id mapping. The result is identical for every
tablet/replica
+ // within a single request, so hot paths should resolve once and reuse the
cached value.
+ public String getCurrentClusterId() throws ComputeGroupException {
+ ConnectContext context = ConnectContext.get();
+ if (context == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("connect context is null in getCurrentClusterId");
+ }
+ throw new ComputeGroupException("connect context not set cluster ",
+
ComputeGroupException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET);
+ }
+
+ String cluster = getPhysicalCluster(context.getCloudCluster());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get compute group by context {}", cluster);
+ }
+
+ UserIdentity currentUid = context.getCurrentUserIdentity();
+ if (currentUid == null ||
Strings.isNullOrEmpty(currentUid.getQualifiedUser())) {
+ LOG.info("connect context user is null.");
+ throw new ComputeGroupException("connect context's user is null",
+
ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_DEFAULT_COMPUTE_GROUP);
+ }
+ try {
+ ((CloudEnv) Env.getCurrentEnv()).checkCloudClusterPriv(cluster);
+ } catch (Exception e) {
+ LOG.warn("check compute group {} for {} auth failed.", cluster,
currentUid);
+ throw new ComputeGroupException(
+ String.format("context compute group %s check auth failed,
user is %s", cluster, currentUid),
+
ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_DEFAULT_COMPUTE_GROUP);
+ }
+
+ String clusterStatus = getCloudStatusByName(cluster);
+ if (!Strings.isNullOrEmpty(clusterStatus)
+ && Cloud.ClusterStatus.valueOf(clusterStatus) ==
Cloud.ClusterStatus.MANUAL_SHUTDOWN) {
+ LOG.warn("auto start compute group {} in manual shutdown status",
cluster);
+ throw new ComputeGroupException(
+ String.format("The current compute group %s has been
manually shutdown", cluster),
+
ComputeGroupException.FailedTypeEnum.CURRENT_COMPUTE_GROUP_BEEN_MANUAL_SHUTDOWN);
+ }
+
+ return resolveClusterIdByName(cluster);
+ }
+
+ // Resolve a known cluster name to its id, handling auto-start (cluster
may resume
+ // under a different name) and validating the cluster is registered.
+ public String resolveClusterIdByName(String cluster) throws
ComputeGroupException {
+ String wakeUPCluster = "";
+ try {
+ wakeUPCluster = waitForAutoStart(cluster);
+ } catch (DdlException e) {
+ LOG.warn("cant resume compute group {}, exception", cluster, e);
+ }
+ if (!Strings.isNullOrEmpty(wakeUPCluster) &&
!cluster.equals(wakeUPCluster)) {
+ cluster = wakeUPCluster;
+ LOG.warn("get backend input compute group {} useless, so auto
start choose a new one compute group {}",
+ cluster, wakeUPCluster);
+ }
+ if (Strings.isNullOrEmpty(cluster)) {
+ LOG.warn("failed to get available be, clusterName: {}", cluster);
+ throw new ComputeGroupException("compute group name is empty",
+
ComputeGroupException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET_COMPUTE_GROUP);
+ }
+ if (!containsCloudCluster(cluster)) {
+ LOG.warn("compute group: {} is not existed", cluster);
+ throw new ComputeGroupException(
+ String.format("The current compute group %s is not registered
in the system", cluster),
+
ComputeGroupException.FailedTypeEnum.CURRENT_COMPUTE_GROUP_NOT_EXIST);
+ }
+ return getCloudClusterIdByName(cluster);
+ }
+
public ComputeGroup getComputeGroupById(String computeGroupId) {
try {
rlock.lock();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index a2021bbd493..11f8a24cca1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -35,6 +35,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ColumnToThrift;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.DistributionInfo;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.IndexToThriftConvertor;
@@ -54,7 +55,9 @@ import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.stream.OlapTableStreamUpdate;
import org.apache.doris.catalog.stream.OlapTableStreamWrapper;
+import org.apache.doris.cloud.catalog.CloudReplica;
import org.apache.doris.cloud.qe.ComputeGroupException;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
@@ -499,6 +502,8 @@ public class OlapScanNode extends ScanNode {
ImmutableMap<Long, Backend> allBackends =
olapTable.getAllBackendsByAllCluster();
long partitionVisibleVersion = visibleVersion;
String partitionVisibleVersionStr = fastToString(visibleVersion);
+ // Lazy: resolved on the first CloudReplica that needs it.
+ String cachedClusterId = null;
for (Tablet tablet : tablets) {
long tabletId = tablet.getId();
long tabletVisibleVersion = partitionVisibleVersion;
@@ -567,7 +572,16 @@ public class OlapScanNode extends ScanNode {
replicas.sort(Replica.ID_COMPARATOR);
Replica replica = replicas.get(useFixReplica >=
replicas.size() ? replicas.size() - 1 : useFixReplica);
if
(context.getSessionVariable().fallbackOtherReplicaWhenFixedCorrupt) {
- long beId = replica.getBackendId();
+ long beId;
+ if (replica instanceof CloudReplica) {
+ if (cachedClusterId == null) {
+ cachedClusterId = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getCurrentClusterId();
+ }
+ beId = ((CloudReplica)
replica).getBackendIdWithClusterId(cachedClusterId);
+ } else {
+ beId = replica.getBackendId();
+ }
Backend backend = allBackends.get(beId);
// If the fixed replica is bad, then not clear the
replicas using random replica
if (backend == null || !backend.isAlive()) {
@@ -621,7 +635,15 @@ public class OlapScanNode extends ScanNode {
Backend backend = null;
long backendId = -1;
try {
- backendId = replica.getBackendId();
+ if (replica instanceof CloudReplica) {
+ if (cachedClusterId == null) {
+ cachedClusterId = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getCurrentClusterId();
+ }
+ backendId = ((CloudReplica)
replica).getBackendIdWithClusterId(cachedClusterId);
+ } else {
+ backendId = replica.getBackendId();
+ }
backend = allBackends.get(backendId);
} catch (ComputeGroupException e) {
LOG.warn("failed to get backend {} for replica {}",
backendId, replica.getId(), e);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index e3a9954b627..8f91dcf1462 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -48,7 +48,9 @@ import org.apache.doris.catalog.RandomDistributionInfo;
import org.apache.doris.catalog.RangePartitionItem;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
+import org.apache.doris.cloud.catalog.CloudTablet;
import org.apache.doris.cloud.qe.ComputeGroupException;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@@ -789,6 +791,8 @@ public class OlapTableSink extends DataSink {
TOlapTableLocationParam slaveLocationParam = new
TOlapTableLocationParam();
// BE id -> path hash
Multimap<Long, Long> allBePathsMap = HashMultimap.create();
+ // Lazy: resolved on the first CloudTablet that needs it.
+ String cachedClusterId = null;
for (long partitionId : partitionIds) {
Partition partition = table.getPartition(partitionId);
int loadRequiredReplicaNum =
table.getLoadRequiredReplicaNum(partition.getId());
@@ -799,7 +803,16 @@ public class OlapTableSink extends DataSink {
StringBuilder errMsgBuilder = new StringBuilder();
Multimap<Long, Long> bePathsMap = HashMultimap.create();
try {
- bePathsMap = tablet.getNormalReplicaBackendPathMap();
+ if (tablet instanceof CloudTablet) {
+ if (cachedClusterId == null) {
+ cachedClusterId = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getCurrentClusterId();
+ }
+ bePathsMap = ((CloudTablet) tablet)
+
.getNormalReplicaBackendPathMapByClusterId(cachedClusterId);
+ } else {
+ bePathsMap =
tablet.getNormalReplicaBackendPathMap();
+ }
if (bePathsMap.keySet().size() <
loadRequiredReplicaNum) {
errMsgBuilder.append("tablet
").append(tablet.getId())
.append(" alive replica num
").append(bePathsMap.keySet().size())
@@ -825,7 +838,7 @@ public class OlapTableSink extends DataSink {
} catch (ComputeGroupException e) {
LOG.warn("failed to get replica backend path for
tablet " + tablet.getId(), e);
errMsgBuilder.append(", ").append(e.toString());
- throw new
UserException(InternalErrorCode.INTERNAL_ERR, errMsgBuilder.toString());
+ throw new
UserException(InternalErrorCode.INTERNAL_ERR, errMsgBuilder.toString(), e);
}
if (!Config.isCloudMode()) {
debugWriteRandomChooseSink(tablet,
partition.getVisibleVersion(), bePathsMap);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index d008294b220..f01de48cf7d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -4535,6 +4535,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
List<TTabletLocation> tablets = new ArrayList<>();
List<TTabletLocation> slaveTablets = new ArrayList<>();
List<TOlapTablePartition> partitions = Lists.newArrayList();
+ final boolean hasBeEndpoint = request.isSetBeEndpoint();
+ // Lazy: resolved on the first CloudTablet that needs it (skipped on
cache-hit).
+ String cachedClusterId = null;
for (String partitionName : addPartitionClauseMap.keySet()) {
Partition partition = table.getPartition(partitionName);
// For thread safety, we preserve the tablet distribution
information of each partition
@@ -4578,9 +4581,17 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// BE id -> path hash
Multimap<Long, Long> bePathsMap;
try {
- if (Config.isCloudMode() && request.isSetBeEndpoint())
{
- bePathsMap = ((CloudTablet) tablet)
-
.getNormalReplicaBackendPathMap(request.be_endpoint);
+ if (tablet instanceof CloudTablet) {
+ CloudTablet cloudTablet = (CloudTablet) tablet;
+ if (hasBeEndpoint) {
+ bePathsMap =
cloudTablet.getNormalReplicaBackendPathMap(request.be_endpoint);
+ } else {
+ if (cachedClusterId == null) {
+ cachedClusterId =
((CloudSystemInfoService) Env.getCurrentSystemInfo())
+ .getCurrentClusterId();
+ }
+ bePathsMap =
cloudTablet.getNormalReplicaBackendPathMapByClusterId(cachedClusterId);
+ }
} else {
bePathsMap =
tablet.getNormalReplicaBackendPathMap();
}
@@ -4852,6 +4863,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
List<TTabletLocation> tablets = new ArrayList<>();
List<TTabletLocation> slaveTablets = new ArrayList<>();
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+ final boolean replaceHasBeEndpoint = request.isSetBeEndpoint();
+ // Lazy: resolved on the first CloudTablet that needs it.
+ String replaceCachedClusterId = null;
for (long partitionId : resultPartitionIds) {
Partition partition = olapTable.getPartition(partitionId);
// For thread safety, we preserve the tablet distribution
information of each partition
@@ -4897,9 +4911,18 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// BE id -> path hash
Multimap<Long, Long> bePathsMap;
try {
- if (Config.isCloudMode() && request.isSetBeEndpoint())
{
- bePathsMap = ((CloudTablet) tablet)
-
.getNormalReplicaBackendPathMap(request.be_endpoint);
+ if (tablet instanceof CloudTablet) {
+ CloudTablet cloudTablet = (CloudTablet) tablet;
+ if (replaceHasBeEndpoint) {
+ bePathsMap =
cloudTablet.getNormalReplicaBackendPathMap(request.be_endpoint);
+ } else {
+ if (replaceCachedClusterId == null) {
+ replaceCachedClusterId =
((CloudSystemInfoService) Env.getCurrentSystemInfo())
+ .getCurrentClusterId();
+ }
+ bePathsMap = cloudTablet
+
.getNormalReplicaBackendPathMapByClusterId(replaceCachedClusterId);
+ }
} else {
bePathsMap =
tablet.getNormalReplicaBackendPathMap();
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
index 8e0ee5c15a7..83f62b351ee 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
@@ -1039,6 +1039,21 @@ public class CloudSystemInfoServiceTest {
}
}
+ @Test
+ public void testContainsCloudCluster() {
+ infoService = new CloudSystemInfoService();
+ // Empty / null inputs short-circuit without touching the map.
+ Assert.assertFalse(infoService.containsCloudCluster(null));
+ Assert.assertFalse(infoService.containsCloudCluster(""));
+ // Unknown cluster name -> false.
+ Assert.assertFalse(infoService.containsCloudCluster("absent_cluster"));
+ // Register a cluster; lookup must hit.
+ infoService.addVirtualClusterInfoToMapsNoLock("cid_1", "cluster_1");
+ Assert.assertTrue(infoService.containsCloudCluster("cluster_1"));
+ // Different name in same map -> still false.
+ Assert.assertFalse(infoService.containsCloudCluster("cluster_2"));
+ }
+
/**
* Helper method to create a test ConnectContext with specific cluster name
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]