This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 064717f2803 [feature](merge-cloud) Add `CloudClusterChecker` to sync 
cluster info… (#30243)
064717f2803 is described below

commit 064717f2803f7448a5eb18ba59b704ebeb8e2890
Author: deardeng <[email protected]>
AuthorDate: Tue Jan 23 23:49:11 2024 +0800

    [feature](merge-cloud) Add `CloudClusterChecker` to sync cluster info… 
(#30243)
    
    
    Co-authored-by: Lightman <[email protected]>
---
 .../main/java/org/apache/doris/common/Config.java  |   8 +
 .../java/org/apache/doris/alter/SystemHandler.java |   4 +-
 .../main/java/org/apache/doris/catalog/Env.java    |  39 +-
 .../doris/cloud/catalog/CloudClusterChecker.java   | 474 +++++++++++++++++++++
 .../org/apache/doris/cloud/catalog/CloudEnv.java   | 311 ++++++++++++++
 .../doris/cloud/system/CloudSystemInfoService.java | 209 +++++++++
 .../org/apache/doris/deploy/DeployManager.java     |   4 +-
 .../doris/httpv2/rest/manager/NodeAction.java      |   2 +-
 .../java/org/apache/doris/metric/MetricRepo.java   | 106 +++++
 .../main/java/org/apache/doris/system/Backend.java |   8 +
 .../org/apache/doris/system/SystemInfoService.java |  14 +-
 11 files changed, 1151 insertions(+), 28 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 46194c484bd..0afede3905a 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2476,6 +2476,14 @@ public class Config extends ConfigBase {
     @ConfField
     public static boolean enable_cloud_snapshot_version = true;
 
+    @ConfField
+    public static int cloud_cluster_check_interval_second = 10;
+
+    @ConfField
+    public static String cloud_sql_server_cluster_name = 
"RESERVED_CLUSTER_NAME_FOR_SQL_SERVER";
+
+    @ConfField
+    public static String cloud_sql_server_cluster_id = 
"RESERVED_CLUSTER_ID_FOR_SQL_SERVER";
     
//==========================================================================
     //                      end of cloud config
     
//==========================================================================
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
index 86551ba0735..e503e093787 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
@@ -152,7 +152,7 @@ public class SystemHandler extends AlterHandler {
         } else if (alterClause instanceof AddObserverClause) {
             AddObserverClause clause = (AddObserverClause) alterClause;
             Env.getCurrentEnv().addFrontend(FrontendNodeType.OBSERVER, 
clause.getHost(),
-                    clause.getPort());
+                    clause.getPort(), "");
         } else if (alterClause instanceof DropObserverClause) {
             DropObserverClause clause = (DropObserverClause) alterClause;
             Env.getCurrentEnv().dropFrontend(FrontendNodeType.OBSERVER, 
clause.getHost(),
@@ -160,7 +160,7 @@ public class SystemHandler extends AlterHandler {
         } else if (alterClause instanceof AddFollowerClause) {
             AddFollowerClause clause = (AddFollowerClause) alterClause;
             Env.getCurrentEnv().addFrontend(FrontendNodeType.FOLLOWER, 
clause.getHost(),
-                    clause.getPort());
+                    clause.getPort(), "");
         } else if (alterClause instanceof DropFollowerClause) {
             DropFollowerClause clause = (DropFollowerClause) alterClause;
             Env.getCurrentEnv().dropFrontend(FrontendNodeType.FOLLOWER, 
clause.getHost(),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 55eb05ba396..9fc98db5f6b 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -330,7 +330,7 @@ public class Env {
 
     private String metaDir;
     private String bdbDir;
-    private String imageDir;
+    protected String imageDir;
 
     private MetaContext metaContext;
     private long epoch = 0;
@@ -380,8 +380,8 @@ public class Env {
 
     private ColumnIdFlushDaemon columnIdFlusher;
 
-    private boolean isFirstTimeStartUp = false;
-    private boolean isElectable;
+    protected boolean isFirstTimeStartUp = false;
+    protected boolean isElectable;
     // set to true after finished replay all meta and ready to serve
     // set to false when catalog is not ready.
     private AtomicBoolean isReady = new AtomicBoolean(false);
@@ -395,8 +395,8 @@ public class Env {
     private BlockingQueue<FrontendNodeType> typeTransferQueue;
 
     // node name is used for bdbje NodeName.
-    private String nodeName;
-    private FrontendNodeType role;
+    protected String nodeName;
+    protected FrontendNodeType role;
     private FrontendNodeType feType;
     // replica and observer use this value to decide provide read service or 
not
     private long synchronizedTimeMs;
@@ -405,19 +405,19 @@ public class Env {
     private MetaIdGenerator idGenerator = new 
MetaIdGenerator(NEXT_ID_INIT_VALUE);
 
     private EditLog editLog;
-    private int clusterId;
-    private String token;
+    protected int clusterId;
+    protected String token;
     // For checkpoint and observer memory replayed marker
     private AtomicLong replayedJournalId;
 
     private static Env CHECKPOINT = null;
     private static long checkpointThreadId = -1;
     private Checkpoint checkpointer;
-    private List<HostInfo> helperNodes = Lists.newArrayList();
-    private HostInfo selfNode = null;
+    protected List<HostInfo> helperNodes = Lists.newArrayList();
+    protected HostInfo selfNode = null;
 
     // node name -> Frontend
-    private ConcurrentHashMap<String, Frontend> frontends;
+    protected ConcurrentHashMap<String, Frontend> frontends;
     // removed frontends' name. used for checking if name is duplicated in 
bdbje
     private ConcurrentLinkedQueue<String> removedFrontends;
 
@@ -1067,7 +1067,7 @@ public class Env {
         this.httpReady.set(httpReady);
     }
 
-    private void getClusterIdAndRole() throws IOException {
+    protected void getClusterIdAndRole() throws IOException {
         File roleFile = new File(this.imageDir, Storage.ROLE_FILE);
         File versionFile = new File(this.imageDir, Storage.VERSION_FILE);
 
@@ -1278,7 +1278,7 @@ public class Env {
 
     // Get the role info and node name from helper node.
     // return false if failed.
-    private boolean getFeNodeTypeAndNameFromHelpers() {
+    protected boolean getFeNodeTypeAndNameFromHelpers() {
         // we try to get info from helper nodes, once we get the right helper 
node,
         // other helper nodes will be ignored and removed.
         HostInfo rightHelperNode = null;
@@ -1584,7 +1584,7 @@ public class Env {
     }
 
     // start all daemon threads only running on Master
-    private void startMasterOnlyDaemonThreads() {
+    protected void startMasterOnlyDaemonThreads() {
         // start checkpoint thread
         checkpointer = new Checkpoint(editLog);
         checkpointer.setMetaContext(metaContext);
@@ -1781,7 +1781,7 @@ public class Env {
         }
     }
 
-    private boolean getVersionFileFromHelper(HostInfo helperNode) throws 
IOException {
+    protected boolean getVersionFileFromHelper(HostInfo helperNode) throws 
IOException {
         try {
             String url = "http://"; + 
NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), Config.http_port)
                     + "/version";
@@ -1797,7 +1797,7 @@ public class Env {
         return false;
     }
 
-    private void getNewImage(HostInfo helperNode) throws IOException {
+    protected void getNewImage(HostInfo helperNode) throws IOException {
         long localImageVersion = 0;
         Storage storage = new Storage(this.imageDir);
         localImageVersion = storage.getLatestImageSeq();
@@ -1829,7 +1829,7 @@ public class Env {
         }
     }
 
-    private boolean isMyself() {
+    protected boolean isMyself() {
         Preconditions.checkNotNull(selfNode);
         Preconditions.checkNotNull(helperNodes);
         LOG.debug("self: {}. helpers: {}", selfNode, helperNodes);
@@ -2762,7 +2762,7 @@ public class Env {
         };
     }
 
-    public void addFrontend(FrontendNodeType role, String host, int 
editLogPort) throws DdlException {
+    public void addFrontend(FrontendNodeType role, String host, int 
editLogPort, String nodeName) throws DdlException {
         if (!tryLock(false)) {
             throw new DdlException("Failed to acquire env lock. Try again");
         }
@@ -2774,7 +2774,10 @@ public class Env {
             if (Config.enable_fqdn_mode && StringUtils.isEmpty(host)) {
                 throw new DdlException("frontend's hostName should not be 
empty while enable_fqdn_mode is true");
             }
-            String nodeName = genFeNodeName(host, editLogPort, false /* new 
name style */);
+
+            if (Strings.isNullOrEmpty(nodeName)) {
+                nodeName = genFeNodeName(host, editLogPort, false /* new name 
style */);
+            }
 
             if (removedFrontends.contains(nodeName)) {
                 throw new DdlException("frontend name already exists " + 
nodeName + ". Try again");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
new file mode 100644
index 00000000000..527909c7bf0
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -0,0 +1,474 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.proto.Cloud.ClusterPB;
+import org.apache.doris.cloud.proto.Cloud.ClusterStatus;
+import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.ha.FrontendNodeType;
+import org.apache.doris.metric.GaugeMetricImpl;
+import org.apache.doris.metric.Metric.MetricUnit;
+import org.apache.doris.metric.MetricLabel;
+import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.Frontend;
+
+import com.google.common.base.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class CloudClusterChecker extends MasterDaemon {
+    private static final Logger LOG = 
LogManager.getLogger(CloudClusterChecker.class);
+
+    public CloudClusterChecker() {
+        super("cloud cluster check", 
Config.cloud_cluster_check_interval_second * 1000L);
+    }
+
+    /**
+     * Diff 2 collections of current and the dest.
+     * @param toAdd output param = (expectedState - currentState)
+     * @param toDel output param = (currentState - expectedState)
+     * @param supplierCurrentMapFunc get the current be or fe objects 
information map from memory, a lambda function
+     * @param supplierNodeMapFunc get be or fe information map from 
meta_service return pb, a lambda function
+     */
+    private <T> void diffNodes(List<T> toAdd, List<T> toDel, 
Supplier<Map<String, T>> supplierCurrentMapFunc,
+                               Supplier<Map<String, T>> supplierNodeMapFunc) {
+        if (toAdd == null || toDel == null) {
+            return;
+        }
+
+        // TODO(gavin): Consider VPC
+        // vpc:ip:port -> Nodes
+        Map<String, T> currentMap = supplierCurrentMapFunc.get();
+        Map<String, T> nodeMap = supplierNodeMapFunc.get();
+
+        LOG.debug("current Nodes={} expected Nodes={}", currentMap.keySet(), 
nodeMap.keySet());
+
+        toDel.addAll(currentMap.keySet().stream().filter(i -> 
!nodeMap.containsKey(i))
+                .map(currentMap::get).collect(Collectors.toList()));
+
+        toAdd.addAll(nodeMap.keySet().stream().filter(i -> 
!currentMap.containsKey(i))
+                .map(nodeMap::get).collect(Collectors.toList()));
+    }
+
+    private void checkToAddCluster(Map<String, ClusterPB> remoteClusterIdToPB, 
Set<String> localClusterIds) {
+        List<String> toAddClusterIds = remoteClusterIdToPB.keySet().stream()
+                .filter(i -> 
!localClusterIds.contains(i)).collect(Collectors.toList());
+        toAddClusterIds.forEach(
+                addId -> {
+                LOG.debug("begin to add clusterId: {}", addId);
+                // Attach tag to BEs
+                Map<String, String> newTagMap = 
Tag.DEFAULT_BACKEND_TAG.toMap();
+                String clusterName = 
remoteClusterIdToPB.get(addId).getClusterName();
+                String clusterId = 
remoteClusterIdToPB.get(addId).getClusterId();
+                String publicEndpoint = 
remoteClusterIdToPB.get(addId).getPublicEndpoint();
+                String privateEndpoint = 
remoteClusterIdToPB.get(addId).getPrivateEndpoint();
+                newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+                newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+                newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, 
publicEndpoint);
+                newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, 
privateEndpoint);
+                // For old versions that do no have status field set
+                ClusterStatus clusterStatus = 
remoteClusterIdToPB.get(addId).hasClusterStatus()
+                        ? remoteClusterIdToPB.get(addId).getClusterStatus() : 
ClusterStatus.NORMAL;
+                newTagMap.put(Tag.CLOUD_CLUSTER_STATUS, 
String.valueOf(clusterStatus));
+                MetricRepo.registerClusterMetrics(clusterName, clusterId);
+                //toAdd.forEach(i -> i.setTagMap(newTagMap));
+                List<Backend> toAdd = new ArrayList<>();
+                for (Cloud.NodeInfoPB node : 
remoteClusterIdToPB.get(addId).getNodesList()) {
+                    String addr = Config.enable_fqdn_mode ? node.getHost() : 
node.getIp();
+                    if (Strings.isNullOrEmpty(addr)) {
+                        LOG.warn("cant get valid add from ms {}", node);
+                        continue;
+                    }
+                    Backend b = new Backend(Env.getCurrentEnv().getNextId(), 
addr, node.getHeartbeatPort());
+                    newTagMap.put(Tag.CLOUD_UNIQUE_ID, 
node.getCloudUniqueId());
+                    b.setTagMap(newTagMap);
+                    toAdd.add(b);
+                }
+                Env.getCurrentSystemInfo().updateCloudBackends(toAdd, new 
ArrayList<>());
+            }
+        );
+    }
+
+    private void checkToDelCluster(Map<String, ClusterPB> remoteClusterIdToPB, 
Set<String> localClusterIds,
+                                   Map<String, List<Backend>> 
clusterIdToBackend) {
+        List<String> toDelClusterIds = localClusterIds.stream()
+                .filter(i -> 
!remoteClusterIdToPB.containsKey(i)).collect(Collectors.toList());
+        // drop be cluster
+        Map<String, List<Backend>> finalClusterIdToBackend = 
clusterIdToBackend;
+        toDelClusterIds.forEach(
+                delId -> {
+                LOG.debug("begin to drop clusterId: {}", delId);
+                List<Backend> toDel =
+                        new 
ArrayList<>(finalClusterIdToBackend.getOrDefault(delId, new ArrayList<>()));
+                Env.getCurrentSystemInfo().updateCloudBackends(new 
ArrayList<>(), toDel);
+                // del clusterName
+                String delClusterName = 
Env.getCurrentSystemInfo().getClusterNameByClusterId(delId);
+                if (delClusterName.isEmpty()) {
+                    LOG.warn("can't get delClusterName, clusterId: {}, plz 
check", delId);
+                    return;
+                }
+                // del clusterID
+                Env.getCurrentSystemInfo().dropCluster(delId, delClusterName);
+            }
+        );
+    }
+
+    private void updateStatus(List<Backend> currentBes, List<Cloud.NodeInfoPB> 
expectedBes) {
+        Map<String, Backend> currentMap = new HashMap<>();
+        for (Backend be : currentBes) {
+            String endpoint = be.getHost() + ":" + be.getHeartbeatPort();
+            currentMap.put(endpoint, be);
+        }
+
+        for (Cloud.NodeInfoPB node : expectedBes) {
+            String addr = Config.enable_fqdn_mode ? node.getHost() : 
node.getIp();
+            if (Strings.isNullOrEmpty(addr)) {
+                LOG.warn("cant get valid add from ms {}", node);
+                continue;
+            }
+            String endpoint = addr + ":" + node.getHeartbeatPort();
+            Cloud.NodeStatusPB status = node.getStatus();
+            Backend be = currentMap.get(endpoint);
+
+            if (status == Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONING) {
+                if (!be.isDecommissioned()) {
+                    LOG.info("decommissioned backend: {} status: {}", be, 
status);
+                    // TODO(merge-cloud): add it when has CloudUpgradeMgr.
+                    /*
+                    try {
+                    } catch (AnalysisException e) {
+                        LOG.warn("failed to register water shed txn id, 
decommission be {}", be.getId(), e);
+                    }
+                    be.setDecommissioned(true);
+                     */
+                }
+            }
+        }
+    }
+
+    private void checkDiffNode(Map<String, ClusterPB> remoteClusterIdToPB,
+                               Map<String, List<Backend>> clusterIdToBackend) {
+        for (String cid : clusterIdToBackend.keySet()) {
+            List<Backend> toAdd = new ArrayList<>();
+            List<Backend> toDel = new ArrayList<>();
+            ClusterPB cp = remoteClusterIdToPB.get(cid);
+            if (cp == null) {
+                LOG.warn("can't get cid {} info, and local cluster info {}, 
remote cluster info {}",
+                        cid, clusterIdToBackend, remoteClusterIdToPB);
+                continue;
+            }
+            String newClusterName = cp.getClusterName();
+            List<Backend> currentBes = clusterIdToBackend.getOrDefault(cid, 
new ArrayList<>());
+            String currentClusterName = 
currentBes.stream().map(Backend::getCloudClusterName).findFirst().orElse("");
+
+            if (!newClusterName.equals(currentClusterName)) {
+                // rename cluster's name
+                LOG.info("cluster_name corresponding to cluster_id has been 
changed,"
+                        + " cluster_id : {} , current_cluster_name : {}, 
new_cluster_name :{}",
+                        cid, currentClusterName, newClusterName);
+                // change all be's cluster_name
+                currentBes.forEach(b -> b.setCloudClusterName(newClusterName));
+                // update clusterNameToId
+                
Env.getCurrentSystemInfo().updateClusterNameToId(newClusterName, 
currentClusterName, cid);
+                // update tags
+                currentBes.forEach(b -> 
Env.getCurrentEnv().getEditLog().logModifyBackend(b));
+            }
+
+            String currentClusterStatus = 
Env.getCurrentSystemInfo().getCloudStatusById(cid);
+
+            // For old versions that do no have status field set
+            ClusterStatus clusterStatus = cp.hasClusterStatus() ? 
cp.getClusterStatus() : ClusterStatus.NORMAL;
+            String newClusterStatus = String.valueOf(clusterStatus);
+            LOG.debug("current cluster status {} {}", currentClusterStatus, 
newClusterStatus);
+            if (!currentClusterStatus.equals(newClusterStatus)) {
+                // cluster's status changed
+                LOG.info("cluster_status corresponding to cluster_id has been 
changed,"
+                        + " cluster_id : {} , current_cluster_status : {}, 
new_cluster_status :{}",
+                        cid, currentClusterStatus, newClusterStatus);
+                // change all be's cluster_status
+                currentBes.forEach(b -> 
b.setCloudClusterStatus(newClusterStatus));
+                // update tags
+                currentBes.forEach(b -> 
Env.getCurrentEnv().getEditLog().logModifyBackend(b));
+            }
+
+            List<String> currentBeEndpoints = currentBes.stream().map(backend 
->
+                    backend.getHost() + ":" + 
backend.getHeartbeatPort()).collect(Collectors.toList());
+            List<Cloud.NodeInfoPB> expectedBes = 
remoteClusterIdToPB.get(cid).getNodesList();
+            List<String> remoteBeEndpoints = expectedBes.stream()
+                    .map(pb -> {
+                        String addr = Config.enable_fqdn_mode ? pb.getHost() : 
pb.getIp();
+                        if (Strings.isNullOrEmpty(addr)) {
+                            LOG.warn("cant get valid add from ms {}", pb);
+                            return "";
+                        }
+                        return addr + ":" + pb.getHeartbeatPort();
+                    }).filter(e -> !Strings.isNullOrEmpty(e))
+                    .collect(Collectors.toList());
+            LOG.info("get cloud cluster, clusterId={} local nodes={} remote 
nodes={}", cid,
+                    currentBeEndpoints, remoteBeEndpoints);
+
+            updateStatus(currentBes, expectedBes);
+
+            // Attach tag to BEs
+            Map<String, String> newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap();
+            newTagMap.put(Tag.CLOUD_CLUSTER_NAME, 
remoteClusterIdToPB.get(cid).getClusterName());
+            newTagMap.put(Tag.CLOUD_CLUSTER_ID, 
remoteClusterIdToPB.get(cid).getClusterId());
+            newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, 
remoteClusterIdToPB.get(cid).getPublicEndpoint());
+            newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, 
remoteClusterIdToPB.get(cid).getPrivateEndpoint());
+
+            diffNodes(toAdd, toDel, () -> {
+                Map<String, Backend> currentMap = new HashMap<>();
+                for (Backend be : currentBes) {
+                    String endpoint = be.getHost() + ":" + 
be.getHeartbeatPort()
+                            + be.getCloudPublicEndpoint() + 
be.getCloudPrivateEndpoint();
+                    currentMap.put(endpoint, be);
+                }
+                return currentMap;
+            }, () -> {
+                Map<String, Backend> nodeMap = new HashMap<>();
+                for (Cloud.NodeInfoPB node : expectedBes) {
+                    String host = Config.enable_fqdn_mode ? node.getHost() : 
node.getIp();
+                    if (Strings.isNullOrEmpty(host)) {
+                        LOG.warn("cant get valid add from ms {}", node);
+                        continue;
+                    }
+                    String endpoint = host + ":" + node.getHeartbeatPort()
+                            + remoteClusterIdToPB.get(cid).getPublicEndpoint()
+                            + 
remoteClusterIdToPB.get(cid).getPrivateEndpoint();
+                    Backend b = new Backend(Env.getCurrentEnv().getNextId(), 
host, node.getHeartbeatPort());
+                    if (node.hasIsSmoothUpgrade()) {
+                        b.setSmoothUpgradeDst(node.getIsSmoothUpgrade());
+                    }
+                    newTagMap.put(Tag.CLOUD_UNIQUE_ID, 
node.getCloudUniqueId());
+                    b.setTagMap(newTagMap);
+                    nodeMap.put(endpoint, b);
+                }
+                return nodeMap;
+            });
+
+            LOG.debug("cluster_id: {}, diffBackends nodes: {}, current: {}, 
toAdd: {}, toDel: {}",
+                    cid, expectedBes, currentBes, toAdd, toDel);
+            if (toAdd.isEmpty() && toDel.isEmpty()) {
+                LOG.debug("runAfterCatalogReady nothing todo");
+                continue;
+            }
+
+            Env.getCurrentSystemInfo().updateCloudBackends(toAdd, toDel);
+        }
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        Map<String, List<Backend>> clusterIdToBackend = 
Env.getCurrentSystemInfo().getCloudClusterIdToBackend();
+        //rpc to ms, to get mysql user can use cluster_id
+        // NOTE: rpc args all empty, use cluster_unique_id to get a instance's 
all cluster info.
+        Cloud.GetClusterResponse response = 
CloudSystemInfoService.getCloudCluster("", "", "");
+        if (!response.hasStatus() || !response.getStatus().hasCode()
+                || (response.getStatus().getCode() != Cloud.MetaServiceCode.OK
+                && response.getStatus().getCode() != 
MetaServiceCode.CLUSTER_NOT_FOUND)) {
+            LOG.warn("failed to get cloud cluster due to incomplete response, "
+                    + "cloud_unique_id={}, response={}", 
Config.cloud_unique_id, response);
+        } else {
+            // clusterId -> clusterPB
+            Map<String, ClusterPB> remoteClusterIdToPB = new HashMap<>();
+            Set<String> localClusterIds = clusterIdToBackend.keySet();
+
+            try {
+                // cluster_ids diff remote <clusterId, nodes> and local 
<clusterId, nodes>
+                // remote - local > 0, add bes to local
+                checkToAddCluster(remoteClusterIdToPB, localClusterIds);
+
+                // local - remote > 0, drop bes from local
+                checkToDelCluster(remoteClusterIdToPB, localClusterIds, 
clusterIdToBackend);
+
+                if (remoteClusterIdToPB.keySet().size() != 
clusterIdToBackend.keySet().size()) {
+                    LOG.warn("impossible cluster id size not match, check it 
local {}, remote {}",
+                            clusterIdToBackend, remoteClusterIdToPB);
+                }
+                // clusterID local == remote, diff nodes
+                checkDiffNode(remoteClusterIdToPB, clusterIdToBackend);
+
+                // check mem map
+                checkFeNodesMapValid();
+            } catch (Exception e) {
+                LOG.warn("diff cluster has exception, {}", e.getMessage(), e);
+            }
+        }
+
+        // Metric
+        clusterIdToBackend = 
Env.getCurrentSystemInfo().getCloudClusterIdToBackend();
+        Map<String, String> clusterNameToId = 
Env.getCurrentSystemInfo().getCloudClusterNameToId();
+        for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) {
+            long aliveNum = 0L;
+            List<Backend> bes = clusterIdToBackend.get(entry.getValue());
+            if (bes == null || bes.size() == 0) {
+                LOG.info("cant get be nodes by cluster {}, bes {}", entry, 
bes);
+                continue;
+            }
+            for (Backend backend : bes) {
+                
MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE.computeIfAbsent(backend.getAddress(), 
key -> {
+                    GaugeMetricImpl<Integer> backendAlive = new 
GaugeMetricImpl<>("backend_alive", MetricUnit.NOUNIT,
+                            "backend alive or not");
+                    backendAlive.addLabel(new MetricLabel("cluster_id", 
entry.getValue()));
+                    backendAlive.addLabel(new MetricLabel("cluster_name", 
entry.getKey()));
+                    backendAlive.addLabel(new MetricLabel("address", key));
+                    MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAlive);
+                    return backendAlive;
+                }).setValue(backend.isAlive() ? 1 : 0);
+                aliveNum = backend.isAlive() ? aliveNum + 1 : aliveNum;
+            }
+
+            
MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE_TOTAL.computeIfAbsent(entry.getKey(), 
key -> {
+                GaugeMetricImpl<Long> backendAliveTotal = new 
GaugeMetricImpl<>("backend_alive_total",
+                        MetricUnit.NOUNIT, "backend alive num in cluster");
+                backendAliveTotal.addLabel(new MetricLabel("cluster_id", 
entry.getValue()));
+                backendAliveTotal.addLabel(new MetricLabel("cluster_name", 
key));
+                MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAliveTotal);
+                return backendAliveTotal;
+            }).setValue(aliveNum);
+        }
+
+        LOG.info("daemon cluster get cluster info succ, current 
cloudClusterIdToBackendMap: {}",
+                Env.getCurrentSystemInfo().getCloudClusterIdToBackend());
+        getObserverFes();
+    }
+
+    private void checkFeNodesMapValid() {
+        LOG.debug("begin checkFeNodesMapValid");
+        Map<String, List<Backend>> clusterIdToBackend = 
Env.getCurrentSystemInfo().getCloudClusterIdToBackend();
+        Set<String> clusterIds = new HashSet<>();
+        Set<String> clusterNames = new HashSet<>();
+        clusterIdToBackend.forEach((clusterId, bes) -> {
+            if (bes.isEmpty()) {
+                LOG.warn("impossible, somewhere err, clusterId {}, 
clusterIdToBeMap {}", clusterId, clusterIdToBackend);
+                clusterIdToBackend.remove(clusterId);
+            }
+            bes.forEach(be -> {
+                clusterIds.add(be.getCloudClusterId());
+                clusterNames.add(be.getCloudClusterName());
+            });
+        });
+
+        Map<String, String> nameToId = 
Env.getCurrentSystemInfo().getCloudClusterNameToId();
+        nameToId.forEach((clusterName, clusterId) -> {
+            if (!clusterIdToBackend.containsKey(clusterId)) {
+                LOG.warn("impossible, somewhere err, clusterId {}, clusterName 
{}, clusterNameToIdMap {}",
+                        clusterId, clusterName, nameToId);
+                nameToId.remove(clusterName);
+            }
+        });
+
+        if (!clusterNames.containsAll(nameToId.keySet()) || 
!nameToId.keySet().containsAll(clusterNames)) {
+            LOG.warn("impossible, somewhere err, clusterNames {}, nameToId 
{}", clusterNames, nameToId);
+        }
+        if (!clusterIds.containsAll(nameToId.values()) || 
!nameToId.values().containsAll(clusterIds)) {
+            LOG.warn("impossible, somewhere err, clusterIds {}, nameToId {}", 
clusterIds, nameToId);
+        }
+        if (!clusterIds.containsAll(clusterIdToBackend.keySet())
+                || !clusterIdToBackend.keySet().containsAll(clusterIds)) {
+            LOG.warn("impossible, somewhere err, clusterIds {}, 
clusterIdToBackend {}",
+                    clusterIds, clusterIdToBackend);
+        }
+    }
+
+    private void getObserverFes() {
+        Cloud.GetClusterResponse response = CloudSystemInfoService
+                .getCloudCluster(Config.cloud_sql_server_cluster_name, 
Config.cloud_sql_server_cluster_id, "");
+        if (!response.hasStatus() || !response.getStatus().hasCode()
+                || response.getStatus().getCode() != Cloud.MetaServiceCode.OK) 
{
+            LOG.warn("failed to get cloud cluster due to incomplete response, "
+                    + "cloud_unique_id={}, clusterId={}, response={}",
+                    Config.cloud_unique_id, 
Config.cloud_sql_server_cluster_id, response);
+            return;
+        }
+        // Note: get_cluster interface cluster(option -> repeated), so it has 
at least one cluster.
+        if (response.getClusterCount() == 0) {
+            LOG.warn("meta service error , return cluster zero, plz check it, "
+                    + "cloud_unique_id={}, clusterId={}, response={}",
+                    Config.cloud_unique_id, 
Config.cloud_sql_server_cluster_id, response);
+            return;
+        }
+
+        ClusterPB cpb = response.getCluster(0);
+        LOG.debug("get cloud cluster, clusterId={} nodes={}", 
Config.cloud_sql_server_cluster_id, cpb.getNodesList());
+        List<Frontend> currentFes = 
Env.getCurrentEnv().getFrontends(FrontendNodeType.OBSERVER);
+        List<Frontend> toAdd = new ArrayList<>();
+        List<Frontend> toDel = new ArrayList<>();
+        List<Cloud.NodeInfoPB> expectedFes = cpb.getNodesList();
+        diffNodes(toAdd, toDel, () -> {
+            Map<String, Frontend> currentMap = new HashMap<>();
+            String selfNode = Env.getCurrentEnv().getSelfNode().getIdent();
+            for (Frontend fe : currentFes) {
+                String endpoint = fe.getHost() + "_" + fe.getEditLogPort();
+                if (selfNode.equals(endpoint)) {
+                    continue;
+                }
+                currentMap.put(endpoint, fe);
+            }
+            return currentMap;
+        }, () -> {
+            Map<String, Frontend> nodeMap = new HashMap<>();
+            String selfNode = Env.getCurrentEnv().getSelfNode().getIdent();
+            for (Cloud.NodeInfoPB node : expectedFes) {
+                String host = Config.enable_fqdn_mode ? node.getHost() : 
node.getIp();
+                if (Strings.isNullOrEmpty(host)) {
+                    LOG.warn("cant get valid add from ms {}", node);
+                    continue;
+                }
+                String endpoint = host + "_" + node.getEditLogPort();
+                if (selfNode.equals(endpoint)) {
+                    continue;
+                }
+                Frontend fe = new Frontend(FrontendNodeType.OBSERVER,
+                        CloudEnv.genFeNodeNameFromMeta(host, 
node.getEditLogPort(),
+                        node.getCtime() * 1000), host, node.getEditLogPort());
+                nodeMap.put(endpoint, fe);
+            }
+            return nodeMap;
+        });
+        LOG.info("diffFrontends nodes: {}, current: {}, toAdd: {}, toDel: {}",
+                expectedFes, currentFes, toAdd, toDel);
+        if (toAdd.isEmpty() && toDel.isEmpty()) {
+            LOG.debug("runAfterCatalogReady getObserverFes nothing todo");
+            return;
+        }
+        try {
+            CloudSystemInfoService.updateFrontends(toAdd, toDel);
+        } catch (DdlException e) {
+            LOG.warn("update cloud frontends exception e: {}, msg: {}", e, 
e.getMessage());
+        }
+    }
+}
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index d6e6b51f2b2..888eee5a0c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -18,20 +18,331 @@
 package org.apache.doris.cloud.catalog;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.proto.Cloud.NodeInfoPB;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.io.CountingDataOutputStream;
+import org.apache.doris.common.util.HttpURLUtil;
+import org.apache.doris.common.util.NetUtils;
+import org.apache.doris.ha.FrontendNodeType;
+import org.apache.doris.httpv2.meta.MetaBaseAction;
+import org.apache.doris.persist.Storage;
+import org.apache.doris.system.Frontend;
+import org.apache.doris.system.SystemInfoService.HostInfo;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.DataInputStream;
+import java.io.File;
 import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 public class CloudEnv extends Env {
 
     private static final Logger LOG = LogManager.getLogger(CloudEnv.class);
 
+    private CloudClusterChecker cloudClusterCheck;
+
     public CloudEnv(boolean isCheckpointCatalog) {
         super(isCheckpointCatalog);
+        this.cloudClusterCheck = new CloudClusterChecker();
+    }
+
+    protected void startMasterOnlyDaemonThreads() {
+        LOG.info("start cloud Master only daemon threads");
+        super.startMasterOnlyDaemonThreads();
+        cloudClusterCheck.start();
+    }
+
+    public static String genFeNodeNameFromMeta(String host, int port, long 
timeMs) {
+        return host + "_" + port + "_" + timeMs;
+    }
+
+    private Cloud.NodeInfoPB getLocalTypeFromMetaService() {
+        // get helperNodes from ms
+        Cloud.GetClusterResponse response = 
CloudSystemInfoService.getCloudCluster(
+                Config.cloud_sql_server_cluster_name, 
Config.cloud_sql_server_cluster_id, "");
+        if (!response.hasStatus() || !response.getStatus().hasCode()
+                || response.getStatus().getCode() != Cloud.MetaServiceCode.OK) 
{
+            LOG.warn("failed to get cloud cluster due to incomplete response, "
+                    + "cloud_unique_id={}, clusterId={}, response={}",
+                    Config.cloud_unique_id, 
Config.cloud_sql_server_cluster_id, response);
+            return null;
+        }
+        LOG.info("get cluster response from meta service {}", response);
+        // Note: get_cluster interface cluster(option -> repeated), so it has 
at least one cluster.
+        if (response.getClusterCount() == 0) {
+            LOG.warn("meta service error , return cluster zero, plz check it, "
+                    + "cloud_unique_id={}, clusterId={}, response={}",
+                    Config.cloud_unique_id, 
Config.cloud_sql_server_cluster_id, response);
+            return null;
+        }
+        List<Cloud.NodeInfoPB> allNodes = response.getCluster(0).getNodesList()
+                
.stream().filter(NodeInfoPB::hasNodeType).collect(Collectors.toList());
+
+        helperNodes.clear();
+        helperNodes.addAll(allNodes.stream()
+                .filter(nodeInfoPB -> nodeInfoPB.getNodeType() == 
NodeInfoPB.NodeType.FE_MASTER)
+                .map(nodeInfoPB -> new HostInfo(
+                Config.enable_fqdn_mode ? nodeInfoPB.getHost() : 
nodeInfoPB.getIp(), nodeInfoPB.getEditLogPort()))
+                .collect(Collectors.toList()));
+        // check only have one master node.
+        Preconditions.checkState(helperNodes.size() == 1);
+
+        Optional<NodeInfoPB> local = allNodes.stream().filter(n -> 
((Config.enable_fqdn_mode ? n.getHost() : n.getIp())
+                + "_" + 
n.getEditLogPort()).equals(selfNode.getIdent())).findAny();
+        return local.orElse(null);
+    }
+
+
+    protected void getClusterIdAndRole() throws IOException {
+        NodeInfoPB.NodeType type = NodeInfoPB.NodeType.UNKNOWN;
+        String feNodeNameFromMeta = "";
+        // cloud mode
+        while (true) {
+            Cloud.NodeInfoPB nodeInfoPB = null;
+            try {
+                nodeInfoPB = getLocalTypeFromMetaService();
+            } catch (Exception e) {
+                LOG.warn("failed to get local fe's type, sleep 5 s, try again. 
exception: {}", e.getMessage());
+            }
+            if (nodeInfoPB == null) {
+                LOG.warn("failed to get local fe's type, sleep 5 s, try 
again.");
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {
+                    LOG.warn("thread sleep Exception", e);
+                }
+                continue;
+            }
+            type = nodeInfoPB.getNodeType();
+            feNodeNameFromMeta = genFeNodeNameFromMeta(
+                Config.enable_fqdn_mode ? nodeInfoPB.getHost() : 
nodeInfoPB.getIp(),
+                nodeInfoPB.getEditLogPort(), nodeInfoPB.getCtime() * 1000);
+            break;
+        }
+
+        LOG.info("current fe's role is {}", type == 
NodeInfoPB.NodeType.FE_MASTER ? "MASTER" :
+                type == NodeInfoPB.NodeType.FE_OBSERVER ? "OBSERVER" : 
"UNKNOWN");
+        if (type == NodeInfoPB.NodeType.UNKNOWN) {
+            LOG.warn("type current not support, please check it");
+            System.exit(-1);
+        }
+        File roleFile = new File(super.imageDir, Storage.ROLE_FILE);
+        File versionFile = new File(super.imageDir, Storage.VERSION_FILE);
+
+        // if helper node is point to self, or there is ROLE and VERSION file 
in local.
+        // get the node type from local
+        if ((type == NodeInfoPB.NodeType.FE_MASTER) || type != 
NodeInfoPB.NodeType.FE_OBSERVER
+                && (isMyself() || (roleFile.exists() && 
versionFile.exists()))) {
+            if (!isMyself()) {
+                LOG.info("find ROLE and VERSION file in local, ignore helper 
nodes: {}", helperNodes);
+            }
+
+            // check file integrity, if has.
+            if ((roleFile.exists() && !versionFile.exists()) || 
(!roleFile.exists() && versionFile.exists())) {
+                throw new IOException("role file and version file must both 
exist or both not exist. "
+                    + "please specific one helper node to recover. will 
exit.");
+            }
+
+            // ATTN:
+            // If the version file and role file does not exist and the helper 
node is itself,
+            // this should be the very beginning startup of the cluster, so we 
create ROLE and VERSION file,
+            // set isFirstTimeStartUp to true, and add itself to frontends 
list.
+            // If ROLE and VERSION file is deleted for some reason, we may 
arbitrarily start this node as
+            // FOLLOWER, which may cause UNDEFINED behavior.
+            // Everything may be OK if the origin role is exactly FOLLOWER,
+            // but if not, FE process will exit somehow.
+            Storage storage = new Storage(this.imageDir);
+            if (!roleFile.exists()) {
+                // The very first time to start the first node of the cluster.
+                // It should became a Master node (Master node's role is also 
FOLLOWER, which means electable)
+
+                // For compatibility. Because this is the very first time to 
start, so we arbitrarily choose
+                // a new name for this node
+                role = FrontendNodeType.FOLLOWER;
+                if (type == NodeInfoPB.NodeType.FE_MASTER) {
+                    nodeName = feNodeNameFromMeta;
+                } else {
+                    nodeName = genFeNodeName(selfNode.getHost(), 
selfNode.getPort(), false /* new style */);
+                }
+
+                storage.writeFrontendRoleAndNodeName(role, nodeName);
+                LOG.info("very first time to start this node. role: {}, node 
name: {}", role.name(), nodeName);
+            } else {
+                role = storage.getRole();
+                if (role == FrontendNodeType.REPLICA) {
+                    // for compatibility
+                    role = FrontendNodeType.FOLLOWER;
+                }
+
+                nodeName = storage.getNodeName();
+                if (Strings.isNullOrEmpty(nodeName)) {
+                    // In normal case, if ROLE file exist, role and nodeName 
should both exist.
+                    // But we will get a empty nodeName after upgrading.
+                    // So for forward compatibility, we use the "old-style" 
way of naming: "ip_port",
+                    // and update the ROLE file.
+                    if (type == NodeInfoPB.NodeType.FE_MASTER) {
+                        nodeName = feNodeNameFromMeta;
+                    } else {
+                        nodeName = genFeNodeName(selfNode.getHost(), 
selfNode.getPort(), true /* old style */);
+                    }
+                    storage.writeFrontendRoleAndNodeName(role, nodeName);
+                    LOG.info("forward compatibility. role: {}, node name: {}", 
role.name(), nodeName);
+                }
+                // Notice:
+                // With the introduction of FQDN, the nodeName is no longer 
bound to an IP address,
+                // so consistency is no longer checked here. Otherwise, the 
startup will fail.
+            }
+
+            Preconditions.checkNotNull(role);
+            Preconditions.checkNotNull(nodeName);
+
+            if (!versionFile.exists()) {
+                clusterId = Config.cluster_id == -1 ? Storage.newClusterID() : 
Config.cluster_id;
+                token = Strings.isNullOrEmpty(Config.auth_token) ? 
Storage.newToken() : Config.auth_token;
+                storage = new Storage(clusterId, token, this.imageDir);
+                storage.writeClusterIdAndToken();
+
+                isFirstTimeStartUp = true;
+                Frontend self = new Frontend(role, nodeName, 
selfNode.getHost(), selfNode.getPort());
+                // Set self alive to true, the 
BDBEnvironment.getReplicationGroupAdmin() will rely on this to get
+                // helper node, before the heartbeat thread is started.
+                self.setIsAlive(true);
+                // We don't need to check if frontends already contains self.
+                // frontends must be empty cause no image is loaded and no 
journal is replayed yet.
+                // And this frontend will be persisted later after opening 
bdbje environment.
+                frontends.put(nodeName, self);
+                LOG.info("add self frontend: {}", self);
+            } else {
+                clusterId = storage.getClusterID();
+                if (storage.getToken() == null) {
+                    token = Strings.isNullOrEmpty(Config.auth_token) ? 
Storage.newToken() : Config.auth_token;
+                    LOG.info("new token={}", token);
+                    storage.setToken(token);
+                    storage.writeClusterIdAndToken();
+                } else {
+                    token = storage.getToken();
+                }
+                isFirstTimeStartUp = false;
+            }
+        } else {
+            // cloud mode, type == NodeInfoPB.NodeType.FE_OBSERVER
+            // try to get role and node name from helper node,
+            // this loop will not end until we get certain role type and name
+            while (true) {
+                if (!getFeNodeTypeAndNameFromHelpers()) {
+                    LOG.warn("current node is not added to the group. please 
add it first. "
+                            + "sleep 5 seconds and retry, current helper 
nodes: {}", helperNodes);
+                    try {
+                        Thread.sleep(5000);
+                        continue;
+                    } catch (InterruptedException e) {
+                        LOG.warn("", e);
+                        System.exit(-1);
+                    }
+                }
+
+                if (role == FrontendNodeType.REPLICA) {
+                    // for compatibility
+                    role = FrontendNodeType.FOLLOWER;
+                }
+                break;
+            }
+
+            Preconditions.checkState(helperNodes.size() == 1);
+            Preconditions.checkNotNull(role);
+            Preconditions.checkNotNull(nodeName);
+
+            HostInfo rightHelperNode = helperNodes.get(0);
+
+            Storage storage = new Storage(this.imageDir);
+            if (roleFile.exists() && (role != storage.getRole() || 
!nodeName.equals(storage.getNodeName()))
+                    || !roleFile.exists()) {
+                storage.writeFrontendRoleAndNodeName(role, nodeName);
+            }
+            if (!versionFile.exists()) {
+                // If the version file doesn't exist, download it from helper 
node
+                if (!getVersionFileFromHelper(rightHelperNode)) {
+                    throw new IOException("fail to download version file from "
+                        + rightHelperNode.getHost() + " will exit.");
+                }
+
+                // NOTE: cluster_id will be init when Storage object is 
constructed,
+                //       so we new one.
+                storage = new Storage(this.imageDir);
+                clusterId = storage.getClusterID();
+                token = storage.getToken();
+                if (Strings.isNullOrEmpty(token)) {
+                    token = Config.auth_token;
+                }
+                LOG.info("get version file from helper, cluster id {}, token 
{}", clusterId, token);
+            } else {
+                // If the version file exist, read the cluster id and check the
+                // id with helper node to make sure they are identical
+                clusterId = storage.getClusterID();
+                token = storage.getToken();
+                LOG.info("check local cluster id {} and token via helper 
node", clusterId, token);
+                try {
+                    String url = "http://"; + NetUtils
+                            
.getHostPortInAccessibleFormat(rightHelperNode.getHost(), Config.http_port) + 
"/check";
+                    HttpURLConnection conn = 
HttpURLUtil.getConnectionWithNodeIdent(url);
+                    conn.setConnectTimeout(2 * 1000);
+                    conn.setReadTimeout(2 * 1000);
+                    String clusterIdString = 
conn.getHeaderField(MetaBaseAction.CLUSTER_ID);
+                    int remoteClusterId = Integer.parseInt(clusterIdString);
+                    if (remoteClusterId != clusterId) {
+                        LOG.error("cluster id is not equal with helper node 
{}. will exit. remote:{}, local:{}",
+                                rightHelperNode.getHost(), clusterIdString, 
clusterId);
+                        throw new IOException(
+                            "cluster id is not equal with helper node "
+                                + rightHelperNode.getHost() + ". will exit.");
+                    }
+                    String remoteToken = 
conn.getHeaderField(MetaBaseAction.TOKEN);
+                    if (token == null && remoteToken != null) {
+                        LOG.info("get token from helper node. token={}.", 
remoteToken);
+                        token = remoteToken;
+                        storage.writeClusterIdAndToken();
+                        storage.reload();
+                    }
+                    if (Config.enable_token_check) {
+                        Preconditions.checkNotNull(token);
+                        Preconditions.checkNotNull(remoteToken);
+                        if (!token.equals(remoteToken)) {
+                            throw new IOException(
+                                "token is not equal with helper node " + 
rightHelperNode.getHost()
+                                    + ", local token " + token + ", remote 
token " + remoteToken + ". will exit.");
+                        }
+                    }
+                } catch (Exception e) {
+                    throw new IOException("fail to check cluster_id and token 
with helper node.", e);
+                }
+            }
+
+            getNewImage(rightHelperNode);
+        }
+
+        if (Config.cluster_id != -1 && clusterId != Config.cluster_id) {
+            throw new IOException("cluster id is not equal with config item 
cluster_id. will exit. "
+                + "If you are in recovery mode, please also modify the 
cluster_id in 'doris-meta/image/VERSION'");
+        }
+
+        if (role.equals(FrontendNodeType.FOLLOWER)) {
+            isElectable = true;
+        } else {
+            isElectable = false;
+        }
+
+        Preconditions.checkState(helperNodes.size() == 1);
+        LOG.info("finished to get cluster id: {}, isElectable: {}, role: {}, 
node name: {}, token: {}",
+                clusterId, isElectable, role.name(), nodeName, token);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 5ae16c9f35b..681f7e195e6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -17,18 +17,36 @@
 
 package org.apache.doris.cloud.system;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.rpc.MetaServiceProxy;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.ha.FrontendNodeType;
+import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.resource.Tag;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.Frontend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TStorageMedium;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 public class CloudSystemInfoService extends SystemInfoService {
+    private static final Logger LOG = 
LogManager.getLogger(CloudSystemInfoService.class);
 
     @Override
     public Map<Tag, List<Long>> selectBackendIdsForReplicaCreation(
@@ -39,5 +57,196 @@ public class CloudSystemInfoService extends 
SystemInfoService {
         return Maps.newHashMap();
     }
 
+    /**
+     * Gets cloud cluster from remote with either clusterId or clusterName
+     *
+     * @param clusterName cluster name
+     * @param clusterId cluster id
+     * @return
+     */
+    public static Cloud.GetClusterResponse getCloudCluster(String clusterName, 
String clusterId, String userName) {
+        Cloud.GetClusterRequest.Builder builder = 
Cloud.GetClusterRequest.newBuilder();
+        builder.setCloudUniqueId(Config.cloud_unique_id)
+            
.setClusterName(clusterName).setClusterId(clusterId).setMysqlUserName(userName);
+        final Cloud.GetClusterRequest pRequest = builder.build();
+        Cloud.GetClusterResponse response;
+        try {
+            response = MetaServiceProxy.getInstance().getCluster(pRequest);
+            return response;
+        } catch (RpcException e) {
+            LOG.warn("rpcToMetaGetClusterInfo exception: {}", e.getMessage());
+            throw new RuntimeException(e);
+        }
+    }
+
+
+    public synchronized void updateCloudBackends(List<Backend> toAdd, 
List<Backend> toDel) {
+        // Deduplicate and validate
+        if (toAdd.size() == 0 && toDel.size() == 0) {
+            LOG.info("nothing to do");
+            return;
+        }
+        Set<String> existedBes = idToBackendRef.entrySet().stream().map(i -> 
i.getValue())
+                .map(i -> i.getHost() + ":" + i.getHeartbeatPort())
+                .collect(Collectors.toSet());
+        LOG.debug("deduplication existedBes={}", existedBes);
+        LOG.debug("before deduplication toAdd={} toDel={}", toAdd, toDel);
+        toAdd = toAdd.stream().filter(i -> !existedBes.contains(i.getHost() + 
":" + i.getHeartbeatPort()))
+            .collect(Collectors.toList());
+        toDel = toDel.stream().filter(i -> existedBes.contains(i.getHost() + 
":" + i.getHeartbeatPort()))
+            .collect(Collectors.toList());
+        LOG.debug("after deduplication toAdd={} toDel={}", toAdd, toDel);
+
+        Map<String, List<Backend>> existedHostToBeList = 
idToBackendRef.values().stream().collect(Collectors.groupingBy(
+                Backend::getHost));
+        for (Backend be : toAdd) {
+            Env.getCurrentEnv().getEditLog().logAddBackend(be);
+            LOG.info("added cloud backend={} ", be);
+            // backends is changed, regenerated tablet number metrics
+            MetricRepo.generateBackendsTabletMetrics();
+
+            String host = be.getHost();
+            if (existedHostToBeList.keySet().contains(host)) {
+                if (be.isSmoothUpgradeDst()) {
+                    LOG.info("a new BE process will start on the existed node 
for smooth upgrading");
+                    int beNum = existedHostToBeList.get(host).size();
+                    Backend colocatedBe = existedHostToBeList.get(host).get(0);
+                    if (beNum != 1) {
+                        LOG.warn("find multiple co-located BEs, num: {}, 
select the 1st {} as migration src", beNum,
+                                colocatedBe.getId());
+                    }
+                    colocatedBe.setSmoothUpgradeSrc(true);
+                    handleNewBeOnSameNode(colocatedBe, be);
+                } else {
+                    LOG.warn("a new BE process will start on the existed node, 
it should not happend unless testing");
+                }
+            }
+        }
+        for (Backend be : toDel) {
+            // drop be, set it not alive
+            be.setAlive(false);
+            be.setLastMissingHeartbeatTime(System.currentTimeMillis());
+            Env.getCurrentEnv().getEditLog().logDropBackend(be);
+            LOG.info("dropped cloud backend={}, and 
lastMissingHeartbeatTime={}", be, be.getLastMissingHeartbeatTime());
+            // backends is changed, regenerated tablet number metrics
+            MetricRepo.generateBackendsTabletMetrics();
+        }
+
+        // Update idToBackendRef
+        Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef);
+        toAdd.forEach(i -> copiedBackends.put(i.getId(), i));
+        toDel.forEach(i -> copiedBackends.remove(i.getId()));
+        ImmutableMap<Long, Backend> newIdToBackend = 
ImmutableMap.copyOf(copiedBackends);
+        idToBackendRef = newIdToBackend;
+
+        // Update idToReportVersionRef
+        Map<Long, AtomicLong> copiedReportVersions = 
Maps.newHashMap(idToReportVersionRef);
+        toAdd.forEach(i -> copiedReportVersions.put(i.getId(), new 
AtomicLong(0L)));
+        toDel.forEach(i -> copiedReportVersions.remove(i.getId()));
+        ImmutableMap<Long, AtomicLong> newIdToReportVersion = 
ImmutableMap.copyOf(copiedReportVersions);
+        idToReportVersionRef = newIdToReportVersion;
+
+        updateCloudClusterMap(toAdd, toDel);
+    }
+
+    private void handleNewBeOnSameNode(Backend oldBe, Backend newBe) {
+        LOG.info("new BE {} starts on the same node as existing BE {}", 
newBe.getId(), oldBe.getId());
+        // TODO(merge-cloud): add it when has CloudTabletRebalancer.
+        // 
Env.getCurrentEnv().getCloudTabletRebalancer().addTabletMigrationTask(oldBe.getId(),
 newBe.getId());
+    }
+
+    public void updateCloudClusterMap(List<Backend> toAdd, List<Backend> 
toDel) {
+        lock.lock();
+        Set<String> clusterNameSet = new HashSet<>();
+        for (Backend b : toAdd) {
+            String clusterName = b.getCloudClusterName();
+            String clusterId = b.getCloudClusterId();
+            if (clusterName.isEmpty() || clusterId.isEmpty()) {
+                LOG.warn("cloud cluster name or id empty: id={}, name={}", 
clusterId, clusterName);
+                continue;
+            }
+            clusterNameSet.add(clusterName);
+            if (clusterNameSet.size() != 1) {
+                LOG.warn("toAdd be list have multi clusterName, please check, 
Set: {}", clusterNameSet);
+            }
+
+            clusterNameToId.put(clusterName, clusterId);
+            List<Backend> be = clusterIdToBackend.get(clusterId);
+            if (be == null) {
+                be = new ArrayList<>();
+                clusterIdToBackend.put(clusterId, be);
+                MetricRepo.registerClusterMetrics(clusterName, clusterId);
+            }
+            Set<String> existed = be.stream().map(i -> i.getHost() + ":" + 
i.getHeartbeatPort())
+                    .collect(Collectors.toSet());
+            // Deduplicate
+            // TODO(gavin): consider vpc
+            boolean alreadyExisted = existed.contains(b.getHost() + ":" + 
b.getHeartbeatPort());
+            if (alreadyExisted) {
+                LOG.info("BE already existed, clusterName={} clusterId={} 
backendNum={} backend={}",
+                        clusterName, clusterId, be.size(), b);
+                continue;
+            }
+            be.add(b);
+            LOG.info("update (add) cloud cluster map, clusterName={} 
clusterId={} backendNum={} current backend={}",
+                    clusterName, clusterId, be.size(), b);
+        }
+
+        for (Backend b : toDel) {
+            String clusterName = b.getCloudClusterName();
+            String clusterId = b.getCloudClusterId();
+            // We actually don't care about cluster name here
+            if (clusterName.isEmpty() || clusterId.isEmpty()) {
+                LOG.warn("cloud cluster name or id empty: id={}, name={}", 
clusterId, clusterName);
+                continue;
+            }
+            List<Backend> be = clusterIdToBackend.get(clusterId);
+            if (be == null) {
+                LOG.warn("try to remove a non-existing cluster, clusterId={} 
clusterName={}",
+                        clusterId, clusterName);
+                continue;
+            }
+            Set<Long> d = toDel.stream().map(i -> 
i.getId()).collect(Collectors.toSet());
+            be = be.stream().filter(i -> 
!d.contains(i.getId())).collect(Collectors.toList());
+            // ATTN: clusterId may have zero nodes
+            clusterIdToBackend.replace(clusterId, be);
+            // such as dropCluster, but no lock
+            // ATTN: Empty clusters are treated as dropped clusters.
+            if (be.size() == 0) {
+                LOG.info("del clusterId {} and clusterName {} due to be nodes 
eq 0", clusterId, clusterName);
+                boolean succ = clusterNameToId.remove(clusterName, clusterId);
+                if (!succ) {
+                    LOG.warn("impossible, somewhere err, clusterNameToId {}, "
+                            + "want remove cluster name {}, cluster id {}", 
clusterNameToId, clusterName, clusterId);
+                }
+                clusterIdToBackend.remove(clusterId);
+            }
+            LOG.info("update (del) cloud cluster map, clusterName={} 
clusterId={} backendNum={} current backend={}",
+                    clusterName, clusterId, be.size(), b);
+        }
+        lock.unlock();
+    }
+
+
+    public static synchronized void updateFrontends(List<Frontend> toAdd,
+                                             List<Frontend> toDel) throws 
DdlException {
+        LOG.debug("updateCloudFrontends toAdd={} toDel={}", toAdd, toDel);
+        String masterIp = Env.getCurrentEnv().getMasterHost();
+        for (Frontend fe : toAdd) {
+            if (masterIp.equals(fe.getHost())) {
+                continue;
+            }
+            Env.getCurrentEnv().addFrontend(FrontendNodeType.OBSERVER,
+                    fe.getHost(), fe.getEditLogPort(), fe.getNodeName());
+            LOG.info("added cloud frontend={} ", fe);
+        }
+        for (Frontend fe : toDel) {
+            if (masterIp.equals(fe.getHost())) {
+                continue;
+            }
+            Env.getCurrentEnv().dropFrontend(FrontendNodeType.OBSERVER, 
fe.getHost(), fe.getEditLogPort());
+            LOG.info("dropped cloud frontend={} ", fe);
+        }
+    }
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java
index 79b8e9392d2..6150530c8cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java
@@ -543,10 +543,10 @@ public class DeployManager extends MasterDaemon {
         try {
             switch (nodeType) {
                 case ELECTABLE:
-                    env.addFrontend(FrontendNodeType.FOLLOWER, remoteHost, 
remotePort);
+                    env.addFrontend(FrontendNodeType.FOLLOWER, remoteHost, 
remotePort, "");
                     break;
                 case OBSERVER:
-                    env.addFrontend(FrontendNodeType.OBSERVER, remoteHost, 
remotePort);
+                    env.addFrontend(FrontendNodeType.OBSERVER, remoteHost, 
remotePort, "");
                     break;
                 case BACKEND:
                 case BACKEND_CN:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
index ba444474628..15d8e51e2c7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
@@ -661,7 +661,7 @@ public class NodeAction extends RestBaseController {
             }
             HostInfo info = 
SystemInfoService.getHostAndPort(reqInfo.getHostPort());
             if ("ADD".equals(action)) {
-                currentEnv.addFrontend(frontendNodeType, info.getHost(), 
info.getPort());
+                currentEnv.addFrontend(frontendNodeType, info.getHost(), 
info.getPort(), "");
             } else if ("DROP".equals(action)) {
                 currentEnv.dropFrontend(frontendNodeType, info.getHost(), 
info.getPort());
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java 
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index 4ad8fdda671..667290132aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -23,6 +23,7 @@ import org.apache.doris.alter.AlterJobV2.JobType;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.util.NetUtils;
 import org.apache.doris.load.EtlJobType;
@@ -42,14 +43,17 @@ import org.apache.doris.transaction.TransactionStatus;
 
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.MetricRegistry;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
@@ -66,6 +70,7 @@ public final class MetricRepo {
 
     public static final String TABLET_NUM = "tablet_num";
     public static final String TABLET_MAX_COMPACTION_SCORE = 
"tablet_max_compaction_score";
+    public static final String CLOUD_TAG = "cloud";
 
     public static LongCounterMetric COUNTER_REQUEST_ALL;
     public static LongCounterMetric COUNTER_QUERY_ALL;
@@ -127,10 +132,111 @@ public final class MetricRepo {
     public static GaugeMetricImpl<Double> GAUGE_QUERY_ERR_RATE;
     public static GaugeMetricImpl<Long> GAUGE_MAX_TABLET_COMPACTION_SCORE;
 
+    // cloud metrics
+    public static ConcurrentHashMap<String, LongCounterMetric>
+            CLOUD_CLUSTER_COUNTER_REQUEST_ALL = new ConcurrentHashMap<>();
+    public static ConcurrentHashMap<String, LongCounterMetric>
+            CLOUD_CLUSTER_COUNTER_QUERY_ALL = new ConcurrentHashMap<>();
+    public static ConcurrentHashMap<String, LongCounterMetric>
+            CLOUD_CLUSTER_COUNTER_QUERY_ERR = new ConcurrentHashMap<>();
+
+    public static ConcurrentHashMap<String, GaugeMetricImpl<Double>>
+            CLOUD_CLUSTER_GAUGE_QUERY_PER_SECOND = new ConcurrentHashMap<>();
+
+    public static GaugeMetricImpl<Double> 
GAUGE_HTTP_COPY_INTO_UPLOAD_PER_SECOND;
+
+    public static GaugeMetricImpl<Double> GAUGE_HTTP_COPY_INTO_UPLOAD_ERR_RATE;
+
+    public static GaugeMetricImpl<Double> 
GAUGE_HTTP_COPY_INTO_QUERY_PER_SECOND;
+
+    public static GaugeMetricImpl<Double> GAUGE_HTTP_COPY_INTO_QUERY_ERR_RATE;
+
+    public static ConcurrentHashMap<String, GaugeMetricImpl<Double>>
+            CLOUD_CLUSTER_GAUGE_REQUEST_PER_SECOND = new ConcurrentHashMap<>();
+    public static ConcurrentHashMap<String, GaugeMetricImpl<Double>>
+            CLOUD_CLUSTER_GAUGE_QUERY_ERR_RATE = new ConcurrentHashMap<>();
+
+    public static ConcurrentHashMap<String, Histogram>
+            CLOUD_CLUSTER_HISTO_QUERY_LATENCY = new ConcurrentHashMap<>();
+
+    public static Map<String, GaugeMetricImpl<Integer>>
+            CLOUD_CLUSTER_BACKEND_ALIVE = new HashMap<>();
+    public static Map<String, GaugeMetricImpl<Long>>
+            CLOUD_CLUSTER_BACKEND_ALIVE_TOTAL = new HashMap<>();
+
+    private static Map<Pair<EtlJobType, JobState>, Long> loadJobNum = 
Maps.newHashMap();
+    private static Long lastCalcLoadJobTime = 0L;
+
     private static ScheduledThreadPoolExecutor metricTimer = 
ThreadPoolManager.newDaemonScheduledThreadPool(1,
             "metric-timer-pool", true);
     private static MetricCalculator metricCalculator = new MetricCalculator();
 
+    public static void registerClusterMetrics(String clusterName, String 
clusterId) {
+        CLOUD_CLUSTER_COUNTER_REQUEST_ALL.computeIfAbsent(clusterName, key -> {
+            LongCounterMetric counterRequestAll = new 
LongCounterMetric("request_total", MetricUnit.REQUESTS,
+                    "total request");
+            counterRequestAll.addLabel(new MetricLabel("cluster_id", 
clusterId));
+            counterRequestAll.addLabel(new MetricLabel("cluster_name", key));
+            DORIS_METRIC_REGISTER.addMetrics(counterRequestAll);
+            return counterRequestAll;
+        });
+
+        
MetricRepo.CLOUD_CLUSTER_COUNTER_QUERY_ALL.computeIfAbsent(clusterName, key -> {
+            LongCounterMetric counterQueryAll = new 
LongCounterMetric("query_total", MetricUnit.REQUESTS,
+                    "total query");
+            counterQueryAll.addLabel(new MetricLabel("cluster_id", clusterId));
+            counterQueryAll.addLabel(new MetricLabel("cluster_name", key));
+            DORIS_METRIC_REGISTER.addMetrics(counterQueryAll);
+            return counterQueryAll;
+        });
+
+        CLOUD_CLUSTER_COUNTER_QUERY_ERR.computeIfAbsent(clusterName, key -> {
+            LongCounterMetric counterQueryErr = new 
LongCounterMetric("query_err", MetricUnit.REQUESTS,
+                    "total error query");
+            counterQueryErr.addLabel(new MetricLabel("cluster_id", clusterId));
+            counterQueryErr.addLabel(new MetricLabel("cluster_name", key));
+            DORIS_METRIC_REGISTER.addMetrics(counterQueryErr);
+            return counterQueryErr;
+        });
+
+        CLOUD_CLUSTER_GAUGE_QUERY_PER_SECOND.computeIfAbsent(clusterName, key 
-> {
+            GaugeMetricImpl<Double> gaugeQueryPerSecond = new 
GaugeMetricImpl<>("qps", MetricUnit.NOUNIT,
+                    "query per second");
+            gaugeQueryPerSecond.addLabel(new MetricLabel("cluster_id", 
clusterId));
+            gaugeQueryPerSecond.addLabel(new MetricLabel("cluster_name", 
clusterName));
+            gaugeQueryPerSecond.setValue(0.0);
+            DORIS_METRIC_REGISTER.addMetrics(gaugeQueryPerSecond);
+            return gaugeQueryPerSecond;
+        }).setValue(0.0);
+
+        CLOUD_CLUSTER_GAUGE_REQUEST_PER_SECOND.computeIfAbsent(clusterName, 
key -> {
+            GaugeMetricImpl<Double> gaugeRequestPerSecond = new 
GaugeMetricImpl<>("rps", MetricUnit.NOUNIT,
+                    "request per second");
+            gaugeRequestPerSecond.addLabel(new MetricLabel("cluster_id", 
clusterId));
+            gaugeRequestPerSecond.addLabel(new MetricLabel("cluster_name", 
clusterName));
+            gaugeRequestPerSecond.setValue(0.0);
+            DORIS_METRIC_REGISTER.addMetrics(gaugeRequestPerSecond);
+            return gaugeRequestPerSecond;
+        }).setValue(0.0);
+
+        CLOUD_CLUSTER_GAUGE_QUERY_ERR_RATE.computeIfAbsent(clusterName, key -> 
{
+            GaugeMetricImpl<Double> gaugeQueryErrRate = new 
GaugeMetricImpl<>("query_err_rate",
+                    MetricUnit.NOUNIT, "query error rate");
+            gaugeQueryErrRate.addLabel(new MetricLabel("cluster_id", 
clusterId));
+            gaugeQueryErrRate.addLabel(new MetricLabel("cluster_name", 
clusterName));
+            gaugeQueryErrRate.setValue(0.0);
+            DORIS_METRIC_REGISTER.addMetrics(gaugeQueryErrRate);
+            return gaugeQueryErrRate;
+        }).setValue(0.0);
+
+        CLOUD_CLUSTER_HISTO_QUERY_LATENCY.computeIfAbsent(clusterName, key -> {
+            Histogram histoQueryLatency = MetricRepo.METRIC_REGISTER.histogram(
+                    MetricRegistry.name("query", "latency", "ms",
+                    MetricRepo.CLOUD_TAG, key));
+            return histoQueryLatency;
+        });
+    }
+
     // init() should only be called after catalog is contructed.
     public static synchronized void init() {
         if (isInit) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index 2f8faa06b1f..f672770c4fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -213,6 +213,10 @@ public class Backend implements Writable {
         return id;
     }
 
+    public String getAddress() {
+        return host + ":" + heartbeatPort;
+    }
+
     public String getHost() {
         return host;
     }
@@ -370,6 +374,10 @@ public class Backend implements Writable {
         return lastMissingHeartbeatTime;
     }
 
+    public void setLastMissingHeartbeatTime(long lastMissingHeartbeatTime) {
+        this.lastMissingHeartbeatTime = lastMissingHeartbeatTime;
+    }
+
     // Backend process epoch, is uesd to tag a beckend process
     // Currently it is always equal to be start time, even during oplog replay.
     public long getProcessEpoch() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 62511e25e47..d4140c2b01f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -86,19 +86,19 @@ public class SystemInfoService {
     public static final String NOT_USING_VALID_CLUSTER_MSG = "Not using valid 
cloud clusters, "
             + "please use a cluster before issuing any queries";
 
-    private volatile ImmutableMap<Long, Backend> idToBackendRef = 
ImmutableMap.of();
-    private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef = 
ImmutableMap.of();
+    protected volatile ImmutableMap<Long, Backend> idToBackendRef = 
ImmutableMap.of();
+    protected volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef = 
ImmutableMap.of();
     // TODO(gavin): use {clusterId -> List<BackendId>} instead to reduce risk 
of inconsistency
     // use exclusive lock to make sure only one thread can change 
clusterIdToBackend and clusterNameToId
-    private ReentrantLock lock = new ReentrantLock();
+    protected ReentrantLock lock = new ReentrantLock();
 
     // for show cluster and cache user owned cluster
     // mysqlUserName -> List of ClusterPB
     private Map<String, List<ClusterPB>> mysqlUserNameToClusterPB = 
ImmutableMap.of();
     // clusterId -> List<Backend>
-    private Map<String, List<Backend>> clusterIdToBackend = new 
ConcurrentHashMap<>();
+    protected Map<String, List<Backend>> clusterIdToBackend = new 
ConcurrentHashMap<>();
     // clusterName -> clusterId
-    private Map<String, String> clusterNameToId = new ConcurrentHashMap<>();
+    protected Map<String, String> clusterNameToId = new ConcurrentHashMap<>();
 
     private volatile ImmutableMap<Long, DiskInfo> pathHashToDiskInfoRef = 
ImmutableMap.of();
 
@@ -1155,4 +1155,8 @@ public class SystemInfoService {
         }
         this.instanceStatus = instanceStatus;
     }
+
+    public synchronized void updateCloudBackends(List<Backend> toAdd, 
List<Backend> toDel) {
+        LOG.warn("Not cloud mode, should not be here");
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to