This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f7eac4e7401 [opt](cloud) cache cluster id per query and drop redundant 
locks on getBackendId hot path (#63636)
f7eac4e7401 is described below

commit f7eac4e74018439d85f0cdae9d5e0177dd39d6f6
Author: Xin Liao <[email protected]>
AuthorDate: Mon Jun 8 19:58:58 2026 +0800

    [opt](cloud) cache cluster id per query and drop redundant locks on 
getBackendId hot path (#63636)
    
    `CloudReplica.getBackendId` was the dominant FE hotspot in cloud-mode
    query/load planning. On a profiled query it accounted for **65.6% of
    total FE CPU samples** (async-profiler). Every replica in the plan
    re-ran the full cluster-id resolution pipeline even though the resolved
    cluster id is identical for every tablet in the same request.
    
    Call breakdown from the flame graph:
    
    ```
    Tablet.getNormalReplicaBackendPathMap            68.6%
    └─ CloudReplica.getBackendId                     65.6%
       └─ getCurrentClusterId                        61.5%
          ├─ getCloudClusterIdByName                 35.6%
          │  ├─ getCloudClusterNames                 24.7%   (rw-lock + 
ArrayList + stream.sorted)
          │  └─ waitForAutoStart                      9.7%   
(StopWatch.start/stop even on NORMAL)
          ├─ getPhysicalCluster                      14.5%
          │  └─ getComputeGroupByName                14.5%   (rw-lock around 
ConcurrentHashMap)
          └─ getCloudStatusByName                     8.3%
             └─ getCloudStatusByIdNoLock             14.0%   (two stream 
passes, String.valueOf per BE)
    ```
    
    `ReadLock.unlock` consistently outweighed `ReadLock.lock` in the profile
    -- a classic cache-line bouncing signature, meaning the read-lock CAS
    was already operating in the non-linear regime where adding concurrent
    threads disproportionately hurts throughput. With high tablet counts the
    call rate (`tablets × replicas × concurrent_queries`) easily reached
    100k+ lock/unlock per second per FE, putting the FE one nudge away from
    a metastable collapse.
    
    ### Changes
    
    - **`CloudReplica.getCurrentClusterId`** becomes `public static` so
    callers can resolve once per request. Adds
    `getBackendIdWithClusterId(String)` that bypasses the per-replica
    pipeline.
    - **`OlapTableSink.createLocation`** and
    **`FrontendServiceImpl.{createPartition, replacePartition}`** resolve
    the cluster id once before iterating tablets and pass it down via the
    new `CloudTablet.getNormalReplicaBackendPathMap(String)` overload. For a
    10k-tablet query this collapses 10k full pipelines into 1.
    - **`CloudSystemInfoService.getComputeGroupByName`**: drop the rw-lock
    around `ConcurrentHashMap` reads, merge `containsKey+get` into a single
    `get`, guard the debug log behind `isDebugEnabled` (the map `toString`
    is expensive).
    - **`CloudSystemInfoService.containsCloudCluster`** (new): cheap
    existence check that replaces `getCloudClusterNames().contains(name)`.
    Avoids an ArrayList copy + stream filter + natural sort + collect under
    a read lock for a single existence query.
    - **`CloudSystemInfoService.getCloudStatusByIdNoLock`**: single-pass
    loop with a precomputed `NORMAL` constant in place of two stream
    pipelines that re-evaluated `String.valueOf(NORMAL)` per backend.
    - **`CloudSystemInfoService.waitForAutoStart`**: fast-path return when
    the cluster is already `NORMAL`, skipping the
    `withTemporaryNereidsTimeout` wrap and the `StopWatch.start/stop` inside
    `waitForClusterToResume` (the while loop never executed in that state
    but the wrapping still ran per call -- ~3% of total FE CPU in the
    profile).
    
    ### Behavior
    
    Semantics preserved on all paths:
    - `waitForAutoStart` NORMAL fast-path: the original code already had
    `existAliveBe = true` as initializer, so `waitForClusterToResume` was a
    no-op for NORMAL clusters.
    - `getComputeGroupByName` without rw-lock: the brief window between a
    rename's two map updates can now return `null`; same as the existing
    read-only paths everywhere else in this class that already access these
    maps without locks.
    - `containsCloudCluster` matches `getCloudClusterNames().contains(name)`
    for non-empty `name` (empty-name filtering in `getCloudClusterNames` was
    for the returned list, not for `.contains` semantics).
    
    ### Further comments
    
    The cluster id resolution is hoisted only on the no-BE-endpoint paths
    (the hot ones in the profile). The endpoint-resolved path in
    `FrontendServiceImpl` already has its own resolution logic and is
    untouched.
---
 .../java/org/apache/doris/alter/AlterJobV2.java    |   2 +-
 .../apache/doris/cloud/catalog/CloudReplica.java   | 125 +++++----------------
 .../apache/doris/cloud/catalog/CloudTablet.java    |  15 ++-
 .../doris/cloud/system/CloudSystemInfoService.java |  93 ++++++++++++++-
 .../org/apache/doris/planner/OlapScanNode.java     |  26 ++++-
 .../org/apache/doris/planner/OlapTableSink.java    |  17 ++-
 .../apache/doris/service/FrontendServiceImpl.java  |  35 +++++-
 .../cloud/system/CloudSystemInfoServiceTest.java   |  15 +++
 8 files changed, 213 insertions(+), 115 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
index cab36e2eacf..142ef24f7e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
@@ -255,7 +255,7 @@ public abstract class AlterJobV2 implements Writable {
             ConnectContext ctx = new ConnectContext();
             ctx.setThreadLocalInfo();
             ctx.setCloudCluster(cloudClusterName);
-            // currently used for CloudReplica.getCurrentClusterId
+            // currently used for CloudSystemInfoService.getCurrentClusterId
             // later maybe used for managing all workload in BE.
             ctx.setCurrentUserIdentity(this.userIdentity);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index 9568825cc4f..0924406c50b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -17,20 +17,17 @@
 
 package org.apache.doris.cloud.catalog;
 
-import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.ColocateTableIndex.GroupId;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Replica;
-import org.apache.doris.cloud.proto.Cloud;
 import org.apache.doris.cloud.qe.ComputeGroupException;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.DdlException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.persist.gson.GsonPostProcessable;
-import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
 
 import com.google.common.base.Strings;
 import com.google.common.hash.HashCode;
@@ -38,7 +35,6 @@ import com.google.common.hash.Hashing;
 import com.google.gson.annotations.SerializedName;
 import lombok.Getter;
 import lombok.Setter;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -180,20 +176,25 @@ public class CloudReplica extends Replica implements 
GsonPostProcessable {
 
     @Override
     public long getBackendId() throws ComputeGroupException {
-        return getBackendIdImpl(getCurrentClusterId());
+        return getBackendIdImpl(cloudInfoService().getCurrentClusterId());
     }
 
-    public long getBackendId(String beEndpoint) {
-        String clusterName = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).getClusterNameByBeAddr(beEndpoint);
-        String physicalClusterName = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
-                .getPhysicalCluster(clusterName);
+    // Variant for callers that have already resolved the cluster id once per 
request
+    // and want to skip the per-replica 
ConnectContext/priv/status/autoStart/existence pipeline.
+    public long getBackendIdWithClusterId(String clusterId) throws 
ComputeGroupException {
+        return getBackendIdImpl(clusterId);
+    }
 
+    public long getBackendId(String beEndpoint) {
         try {
-            String clusterId = getCloudClusterIdByName(physicalClusterName);
+            CloudSystemInfoService infoService = cloudInfoService();
+            String clusterName = 
infoService.getClusterNameByBeAddr(beEndpoint);
+            String physicalClusterName = 
infoService.getPhysicalCluster(clusterName);
+            String clusterId = 
infoService.resolveClusterIdByName(physicalClusterName);
             return getBackendIdImpl(clusterId);
         } catch (ComputeGroupException e) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("failed to get compute group name {}", 
physicalClusterName, e);
+                LOG.debug("failed to get compute group name for endpoint {}", 
beEndpoint, e);
             }
             return -1;
         }
@@ -202,7 +203,7 @@ public class CloudReplica extends Replica implements 
GsonPostProcessable {
     public long getPrimaryBackendId() {
         String clusterId;
         try {
-            clusterId = getCurrentClusterId();
+            clusterId = cloudInfoService().getCurrentClusterId();
         } catch (ComputeGroupException e) {
             return -1L;
         }
@@ -214,6 +215,19 @@ public class CloudReplica extends Replica implements 
GsonPostProcessable {
         return getClusterPrimaryBackendId(clusterId);
     }
 
+    // Returns the CloudSystemInfoService instance, or throws 
ComputeGroupException when
+    // Env was set up with a base SystemInfoService (typically a unit test 
that mocks the
+    // base type). Production cloud-mode FE never hits the throw branch.
+    private static CloudSystemInfoService cloudInfoService() throws 
ComputeGroupException {
+        SystemInfoService info = Env.getCurrentSystemInfo();
+        if (info instanceof CloudSystemInfoService) {
+            return (CloudSystemInfoService) info;
+        }
+        throw new ComputeGroupException(
+                "current system info service is not cloud-aware",
+                ComputeGroupException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET);
+    }
+
     public long getClusterPrimaryBackendId(String clusterId) {
         if (isColocated()) {
             try {
@@ -267,91 +281,6 @@ public class CloudReplica extends Replica implements 
GsonPostProcessable {
         return result;
     }
 
-    private String getCurrentClusterId() throws ComputeGroupException {
-        // Not in a connect session
-        String cluster = null;
-        ConnectContext context = ConnectContext.get();
-        if (context != null) {
-            // TODO(wb) rethinking whether should update err status.
-            cluster = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
-                    .getPhysicalCluster(context.getCloudCluster());
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("get compute group by context {}", cluster);
-            }
-
-            UserIdentity currentUid = context.getCurrentUserIdentity();
-            if (currentUid != null && 
!StringUtils.isEmpty(currentUid.getQualifiedUser())) {
-                try {
-                    ((CloudEnv) 
Env.getCurrentEnv()).checkCloudClusterPriv(cluster);
-                } catch (Exception e) {
-                    LOG.warn("check compute group {} for {} auth failed.", 
cluster,
-                            context.getCurrentUserIdentity().toString());
-                    throw new ComputeGroupException(
-                            String.format("context compute group %s check auth 
failed, user is %s",
-                                    cluster, 
context.getCurrentUserIdentity().toString()),
-                            
ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_DEFAULT_COMPUTE_GROUP);
-                }
-            } else {
-                LOG.info("connect context user is null.");
-                throw new ComputeGroupException("connect context's user is 
null",
-                        
ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_DEFAULT_COMPUTE_GROUP);
-            }
-
-            String clusterStatus = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
-                    .getCloudStatusByName(cluster);
-            if (!Strings.isNullOrEmpty(clusterStatus)
-                    && Cloud.ClusterStatus.valueOf(clusterStatus)
-                    == Cloud.ClusterStatus.MANUAL_SHUTDOWN) {
-                LOG.warn("auto start compute group {} in manual shutdown 
status", cluster);
-                throw new ComputeGroupException(
-                        String.format("The current compute group %s has been 
manually shutdown", cluster),
-                        
ComputeGroupException.FailedTypeEnum.CURRENT_COMPUTE_GROUP_BEEN_MANUAL_SHUTDOWN);
-            }
-        } else {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("connect context is null in getBackendId");
-            }
-            throw new ComputeGroupException("connect context not set cluster ",
-                    
ComputeGroupException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET);
-        }
-
-        return getCloudClusterIdByName(cluster);
-    }
-
-    private String getCloudClusterIdByName(String cluster) throws 
ComputeGroupException {
-        // if cluster is SUSPENDED, wait
-        String wakeUPCluster = "";
-        try {
-            wakeUPCluster = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).waitForAutoStart(cluster);
-        } catch (DdlException e) {
-            // this function cant throw exception. so just log it
-            LOG.warn("cant resume compute group {}, exception", cluster, e);
-        }
-        if (!Strings.isNullOrEmpty(wakeUPCluster) && 
!cluster.equals(wakeUPCluster)) {
-            cluster = wakeUPCluster;
-            LOG.warn("get backend input compute group {} useless, so auto 
start choose a new one compute group {}",
-                    cluster, wakeUPCluster);
-        }
-        // check default compute group valid.
-        if (Strings.isNullOrEmpty(cluster)) {
-            LOG.warn("failed to get available be, clusterName: {}", cluster);
-            throw new ComputeGroupException("compute group name is empty",
-                
ComputeGroupException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET_COMPUTE_GROUP);
-        }
-        boolean exist = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
-                .getCloudClusterNames().contains(cluster);
-        if (!exist) {
-            // can't use this default compute group, plz change another
-            LOG.warn("compute group: {} is not existed", cluster);
-            throw new ComputeGroupException(
-                String.format("The current compute group %s is not registered 
in the system", cluster),
-                
ComputeGroupException.FailedTypeEnum.CURRENT_COMPUTE_GROUP_NOT_EXIST);
-        }
-
-        return ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).getCloudClusterIdByName(cluster);
-    }
-
     private long getBackendIdImpl(String clusterId) throws 
ComputeGroupException {
         if (Strings.isNullOrEmpty(clusterId)) {
             return -1L;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
index c1a3ed71290..541570b079e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.Replica.ReplicaState;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletSlidingWindowAccessStats;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.UserException;
 import org.apache.doris.persist.gson.GsonPostProcessable;
@@ -74,7 +75,19 @@ public class CloudTablet extends Tablet implements 
GsonPostProcessable {
 
     @Override
     public Multimap<Long, Long> getNormalReplicaBackendPathMap() throws 
UserException {
-        Multimap<Long, Long> pathMap = super.getNormalReplicaBackendPathMap();
+        // Per-tablet entry point: resolves cluster id here for callers that 
don't hoist it.
+        // High-tablet-count callers should resolve once and pass the cluster 
id in via
+        // getNormalReplicaBackendPathMapByClusterId to amortize the 
resolution.
+        String clusterId = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).getCurrentClusterId();
+        return getNormalReplicaBackendPathMapByClusterId(clusterId);
+    }
+
+    // Cluster id is supplied by the caller (resolved lazily once per request),
+    // bypassing the per-replica 
ConnectContext/priv/status/autoStart/existence pipeline.
+    public Multimap<Long, Long> 
getNormalReplicaBackendPathMapByClusterId(String clusterId) throws 
UserException {
+        TabletSlidingWindowAccessStats.recordTablet(getId());
+        Multimap<Long, Long> pathMap = 
super.getNormalReplicaBackendPathMapImpl(null,
+                (rep, be) -> ((CloudReplica) 
rep).getBackendIdWithClusterId(clusterId));
         return backendPathMapReprocess(pathMap);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 41ff4d87fa2..9d960df6507 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.cloud.system;
 
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.cloud.catalog.CloudEnv;
@@ -118,18 +119,100 @@ public class CloudSystemInfoService extends 
SystemInfoService {
     }
 
     public ComputeGroup getComputeGroupByName(String computeGroupName) {
-        LOG.debug("get id {} computeGroupIdToComputeGroup : {} ", 
computeGroupName, computeGroupIdToComputeGroup);
+        // rlock guards the compound name->id->group lookup: writers 
(add/remove/rename)
+        // update both maps under wlock, and the read must observe a 
consistent snapshot
+        // so callers like getPhysicalCluster don't transiently see a virtual 
group name
+        // with a null group and fall back to treating it as a physical 
cluster.
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("get id {} computeGroupIdToComputeGroup : {} ", 
computeGroupName, computeGroupIdToComputeGroup);
+        }
         try {
             rlock.lock();
-            if (!clusterNameToId.containsKey(computeGroupName)) {
-                return null;
-            }
-            return 
computeGroupIdToComputeGroup.get(clusterNameToId.get(computeGroupName));
+            String id = clusterNameToId.get(computeGroupName);
+            return id == null ? null : computeGroupIdToComputeGroup.get(id);
         } finally {
             rlock.unlock();
         }
     }
 
+    public boolean containsCloudCluster(String clusterName) {
+        return !Strings.isNullOrEmpty(clusterName) && 
clusterNameToId.containsKey(clusterName);
+    }
+
+    // Resolve the cluster id for the current ConnectContext: physical-cluster 
lookup,
+    // priv check, status check (reject MANUAL_SHUTDOWN), wait-for-autoStart, 
existence
+    // check, finally name->id mapping. The result is identical for every 
tablet/replica
+    // within a single request, so hot paths should resolve once and reuse the 
cached value.
+    public String getCurrentClusterId() throws ComputeGroupException {
+        ConnectContext context = ConnectContext.get();
+        if (context == null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("connect context is null in getCurrentClusterId");
+            }
+            throw new ComputeGroupException("connect context not set cluster ",
+                    
ComputeGroupException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET);
+        }
+
+        String cluster = getPhysicalCluster(context.getCloudCluster());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("get compute group by context {}", cluster);
+        }
+
+        UserIdentity currentUid = context.getCurrentUserIdentity();
+        if (currentUid == null || 
Strings.isNullOrEmpty(currentUid.getQualifiedUser())) {
+            LOG.info("connect context user is null.");
+            throw new ComputeGroupException("connect context's user is null",
+                    
ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_DEFAULT_COMPUTE_GROUP);
+        }
+        try {
+            ((CloudEnv) Env.getCurrentEnv()).checkCloudClusterPriv(cluster);
+        } catch (Exception e) {
+            LOG.warn("check compute group {} for {} auth failed.", cluster, 
currentUid);
+            throw new ComputeGroupException(
+                    String.format("context compute group %s check auth failed, 
user is %s", cluster, currentUid),
+                    
ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_DEFAULT_COMPUTE_GROUP);
+        }
+
+        String clusterStatus = getCloudStatusByName(cluster);
+        if (!Strings.isNullOrEmpty(clusterStatus)
+                && Cloud.ClusterStatus.valueOf(clusterStatus) == 
Cloud.ClusterStatus.MANUAL_SHUTDOWN) {
+            LOG.warn("auto start compute group {} in manual shutdown status", 
cluster);
+            throw new ComputeGroupException(
+                    String.format("The current compute group %s has been 
manually shutdown", cluster),
+                    
ComputeGroupException.FailedTypeEnum.CURRENT_COMPUTE_GROUP_BEEN_MANUAL_SHUTDOWN);
+        }
+
+        return resolveClusterIdByName(cluster);
+    }
+
+    // Resolve a known cluster name to its id, handling auto-start (cluster 
may resume
+    // under a different name) and validating the cluster is registered.
+    public String resolveClusterIdByName(String cluster) throws 
ComputeGroupException {
+        String wakeUPCluster = "";
+        try {
+            wakeUPCluster = waitForAutoStart(cluster);
+        } catch (DdlException e) {
+            LOG.warn("cant resume compute group {}, exception", cluster, e);
+        }
+        if (!Strings.isNullOrEmpty(wakeUPCluster) && 
!cluster.equals(wakeUPCluster)) {
+            cluster = wakeUPCluster;
+            LOG.warn("get backend input compute group {} useless, so auto 
start choose a new one compute group {}",
+                    cluster, wakeUPCluster);
+        }
+        if (Strings.isNullOrEmpty(cluster)) {
+            LOG.warn("failed to get available be, clusterName: {}", cluster);
+            throw new ComputeGroupException("compute group name is empty",
+                
ComputeGroupException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET_COMPUTE_GROUP);
+        }
+        if (!containsCloudCluster(cluster)) {
+            LOG.warn("compute group: {} is not existed", cluster);
+            throw new ComputeGroupException(
+                String.format("The current compute group %s is not registered 
in the system", cluster),
+                
ComputeGroupException.FailedTypeEnum.CURRENT_COMPUTE_GROUP_NOT_EXIST);
+        }
+        return getCloudClusterIdByName(cluster);
+    }
+
     public ComputeGroup getComputeGroupById(String computeGroupId) {
         try {
             rlock.lock();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index a2021bbd493..11f8a24cca1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -35,6 +35,7 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.ColumnToThrift;
 import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.DistributionInfo;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HashDistributionInfo;
 import org.apache.doris.catalog.Index;
 import org.apache.doris.catalog.IndexToThriftConvertor;
@@ -54,7 +55,9 @@ import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.stream.OlapTableStreamUpdate;
 import org.apache.doris.catalog.stream.OlapTableStreamWrapper;
+import org.apache.doris.cloud.catalog.CloudReplica;
 import org.apache.doris.cloud.qe.ComputeGroupException;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.ErrorCode;
@@ -499,6 +502,8 @@ public class OlapScanNode extends ScanNode {
         ImmutableMap<Long, Backend> allBackends = 
olapTable.getAllBackendsByAllCluster();
         long partitionVisibleVersion = visibleVersion;
         String partitionVisibleVersionStr = fastToString(visibleVersion);
+        // Lazy: resolved on the first CloudReplica that needs it.
+        String cachedClusterId = null;
         for (Tablet tablet : tablets) {
             long tabletId = tablet.getId();
             long tabletVisibleVersion = partitionVisibleVersion;
@@ -567,7 +572,16 @@ public class OlapScanNode extends ScanNode {
                 replicas.sort(Replica.ID_COMPARATOR);
                 Replica replica = replicas.get(useFixReplica >= 
replicas.size() ? replicas.size() - 1 : useFixReplica);
                 if 
(context.getSessionVariable().fallbackOtherReplicaWhenFixedCorrupt) {
-                    long beId = replica.getBackendId();
+                    long beId;
+                    if (replica instanceof CloudReplica) {
+                        if (cachedClusterId == null) {
+                            cachedClusterId = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
+                                    .getCurrentClusterId();
+                        }
+                        beId = ((CloudReplica) 
replica).getBackendIdWithClusterId(cachedClusterId);
+                    } else {
+                        beId = replica.getBackendId();
+                    }
                     Backend backend = allBackends.get(beId);
                     // If the fixed replica is bad, then not clear the 
replicas using random replica
                     if (backend == null || !backend.isAlive()) {
@@ -621,7 +635,15 @@ public class OlapScanNode extends ScanNode {
                 Backend backend = null;
                 long backendId = -1;
                 try {
-                    backendId = replica.getBackendId();
+                    if (replica instanceof CloudReplica) {
+                        if (cachedClusterId == null) {
+                            cachedClusterId = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
+                                    .getCurrentClusterId();
+                        }
+                        backendId = ((CloudReplica) 
replica).getBackendIdWithClusterId(cachedClusterId);
+                    } else {
+                        backendId = replica.getBackendId();
+                    }
                     backend = allBackends.get(backendId);
                 } catch (ComputeGroupException e) {
                     LOG.warn("failed to get backend {} for replica {}", 
backendId, replica.getId(), e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index e3a9954b627..8f91dcf1462 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -48,7 +48,9 @@ import org.apache.doris.catalog.RandomDistributionInfo;
 import org.apache.doris.catalog.RangePartitionItem;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.Tablet;
+import org.apache.doris.cloud.catalog.CloudTablet;
 import org.apache.doris.cloud.qe.ComputeGroupException;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -789,6 +791,8 @@ public class OlapTableSink extends DataSink {
         TOlapTableLocationParam slaveLocationParam = new 
TOlapTableLocationParam();
         // BE id -> path hash
         Multimap<Long, Long> allBePathsMap = HashMultimap.create();
+        // Lazy: resolved on the first CloudTablet that needs it.
+        String cachedClusterId = null;
         for (long partitionId : partitionIds) {
             Partition partition = table.getPartition(partitionId);
             int loadRequiredReplicaNum = 
table.getLoadRequiredReplicaNum(partition.getId());
@@ -799,7 +803,16 @@ public class OlapTableSink extends DataSink {
                     StringBuilder errMsgBuilder = new StringBuilder();
                     Multimap<Long, Long> bePathsMap = HashMultimap.create();
                     try {
-                        bePathsMap = tablet.getNormalReplicaBackendPathMap();
+                        if (tablet instanceof CloudTablet) {
+                            if (cachedClusterId == null) {
+                                cachedClusterId = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
+                                        .getCurrentClusterId();
+                            }
+                            bePathsMap = ((CloudTablet) tablet)
+                                    
.getNormalReplicaBackendPathMapByClusterId(cachedClusterId);
+                        } else {
+                            bePathsMap = 
tablet.getNormalReplicaBackendPathMap();
+                        }
                         if (bePathsMap.keySet().size() < 
loadRequiredReplicaNum) {
                             errMsgBuilder.append("tablet 
").append(tablet.getId())
                                     .append(" alive replica num 
").append(bePathsMap.keySet().size())
@@ -825,7 +838,7 @@ public class OlapTableSink extends DataSink {
                     } catch (ComputeGroupException e) {
                         LOG.warn("failed to get replica backend path for 
tablet " + tablet.getId(), e);
                         errMsgBuilder.append(", ").append(e.toString());
-                        throw new 
UserException(InternalErrorCode.INTERNAL_ERR, errMsgBuilder.toString());
+                        throw new 
UserException(InternalErrorCode.INTERNAL_ERR, errMsgBuilder.toString(), e);
                     }
                     if (!Config.isCloudMode()) {
                         debugWriteRandomChooseSink(tablet, 
partition.getVisibleVersion(), bePathsMap);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index d008294b220..f01de48cf7d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -4535,6 +4535,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         List<TTabletLocation> tablets = new ArrayList<>();
         List<TTabletLocation> slaveTablets = new ArrayList<>();
         List<TOlapTablePartition> partitions = Lists.newArrayList();
+        final boolean hasBeEndpoint = request.isSetBeEndpoint();
+        // Lazy: resolved on the first CloudTablet that needs it (skipped on 
cache-hit).
+        String cachedClusterId = null;
         for (String partitionName : addPartitionClauseMap.keySet()) {
             Partition partition = table.getPartition(partitionName);
             // For thread safety, we preserve the tablet distribution 
information of each partition
@@ -4578,9 +4581,17 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                     // BE id -> path hash
                     Multimap<Long, Long> bePathsMap;
                     try {
-                        if (Config.isCloudMode() && request.isSetBeEndpoint()) 
{
-                            bePathsMap = ((CloudTablet) tablet)
-                                    
.getNormalReplicaBackendPathMap(request.be_endpoint);
+                        if (tablet instanceof CloudTablet) {
+                            CloudTablet cloudTablet = (CloudTablet) tablet;
+                            if (hasBeEndpoint) {
+                                bePathsMap = 
cloudTablet.getNormalReplicaBackendPathMap(request.be_endpoint);
+                            } else {
+                                if (cachedClusterId == null) {
+                                    cachedClusterId = 
((CloudSystemInfoService) Env.getCurrentSystemInfo())
+                                            .getCurrentClusterId();
+                                }
+                                bePathsMap = 
cloudTablet.getNormalReplicaBackendPathMapByClusterId(cachedClusterId);
+                            }
                         } else {
                             bePathsMap = 
tablet.getNormalReplicaBackendPathMap();
                         }
@@ -4852,6 +4863,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         List<TTabletLocation> tablets = new ArrayList<>();
         List<TTabletLocation> slaveTablets = new ArrayList<>();
         PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+        final boolean replaceHasBeEndpoint = request.isSetBeEndpoint();
+        // Lazy: resolved on the first CloudTablet that needs it.
+        String replaceCachedClusterId = null;
         for (long partitionId : resultPartitionIds) {
             Partition partition = olapTable.getPartition(partitionId);
             // For thread safety, we preserve the tablet distribution 
information of each partition
@@ -4897,9 +4911,18 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                     // BE id -> path hash
                     Multimap<Long, Long> bePathsMap;
                     try {
-                        if (Config.isCloudMode() && request.isSetBeEndpoint()) 
{
-                            bePathsMap = ((CloudTablet) tablet)
-                                    
.getNormalReplicaBackendPathMap(request.be_endpoint);
+                        if (tablet instanceof CloudTablet) {
+                            CloudTablet cloudTablet = (CloudTablet) tablet;
+                            if (replaceHasBeEndpoint) {
+                                bePathsMap = 
cloudTablet.getNormalReplicaBackendPathMap(request.be_endpoint);
+                            } else {
+                                if (replaceCachedClusterId == null) {
+                                    replaceCachedClusterId = 
((CloudSystemInfoService) Env.getCurrentSystemInfo())
+                                            .getCurrentClusterId();
+                                }
+                                bePathsMap = cloudTablet
+                                        
.getNormalReplicaBackendPathMapByClusterId(replaceCachedClusterId);
+                            }
                         } else {
                             bePathsMap = 
tablet.getNormalReplicaBackendPathMap();
                         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
index 8e0ee5c15a7..83f62b351ee 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
@@ -1039,6 +1039,21 @@ public class CloudSystemInfoServiceTest {
         }
     }
 
+    @Test
+    public void testContainsCloudCluster() {
+        infoService = new CloudSystemInfoService();
+        // Empty / null inputs short-circuit without touching the map.
+        Assert.assertFalse(infoService.containsCloudCluster(null));
+        Assert.assertFalse(infoService.containsCloudCluster(""));
+        // Unknown cluster name -> false.
+        Assert.assertFalse(infoService.containsCloudCluster("absent_cluster"));
+        // Register a cluster; lookup must hit.
+        infoService.addVirtualClusterInfoToMapsNoLock("cid_1", "cluster_1");
+        Assert.assertTrue(infoService.containsCloudCluster("cluster_1"));
+        // Different name in same map -> still false.
+        Assert.assertFalse(infoService.containsCloudCluster("cluster_2"));
+    }
+
     /**
      * Helper method to create a test ConnectContext with specific cluster name
      */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to