This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 57593c0c77e7c5d55f75b16597f465b9d00563fd
Author: deardeng <[email protected]>
AuthorDate: Thu Jun 13 20:16:19 2024 +0800

    (cloud-merge) Support BE smooth upgrade (#35819)
---
 be/src/http/action/shrink_mem_action.cpp           |  40 +++
 be/src/http/action/shrink_mem_action.h             |  32 ++
 be/src/service/http_service.cpp                    |   5 +
 .../doris/cloud/catalog/CloudClusterChecker.java   |   3 +-
 .../doris/cloud/catalog/CloudTabletRebalancer.java |  12 +-
 .../doris/cloud/catalog/CloudUpgradeMgr.java       |   1 +
 .../doris/cloud/system/CloudSystemInfoService.java | 384 +++++++++++++--------
 .../doris/statistics/util/StatisticsUtil.java      |   4 +
 8 files changed, 322 insertions(+), 159 deletions(-)

diff --git a/be/src/http/action/shrink_mem_action.cpp 
b/be/src/http/action/shrink_mem_action.cpp
new file mode 100644
index 00000000000..547f08ee82d
--- /dev/null
+++ b/be/src/http/action/shrink_mem_action.cpp
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/shrink_mem_action.h"
+
+#include <fmt/core.h>
+
+#include "http/http_channel.h"
+#include "http/http_request.h"
+#include "runtime/exec_env.h"
+#include "util/brpc_client_cache.h"
+#include "util/mem_info.h"
+#include "util/string_util.h"
+
+namespace doris {
+void ShrinkMemAction::handle(HttpRequest* req) {
+    LOG(INFO) << "begin shrink memory";
+    /* this interface might be ready for cloud in the near future
+     * int freed_mem = 0;
+     * doris::MemInfo::process_cache_gc(&freed_mem); */
+    MemInfo::process_minor_gc();
+    LOG(INFO) << "shrink memory triggered, using Process Minor GC Free Memory";
+    HttpChannel::send_reply(req, HttpStatus::OK, "shrinking");
+}
+
+} // namespace doris
diff --git a/be/src/http/action/shrink_mem_action.h 
b/be/src/http/action/shrink_mem_action.h
new file mode 100644
index 00000000000..2ffae60327f
--- /dev/null
+++ b/be/src/http/action/shrink_mem_action.h
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "http/http_handler.h"
+
+namespace doris {
+class ExecEnv;
+class ShrinkMemAction : public HttpHandler {
+public:
+    explicit ShrinkMemAction() {}
+
+    virtual ~ShrinkMemAction() {}
+
+    void handle(HttpRequest* req) override;
+};
+} // namespace doris
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index 1cbca7bacf7..ba56d2cb857 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -54,6 +54,7 @@
 #include "http/action/reset_rpc_channel_action.h"
 #include "http/action/restore_tablet_action.h"
 #include "http/action/show_hotspot_action.h"
+#include "http/action/shrink_mem_action.h"
 #include "http/action/snapshot_action.h"
 #include "http/action/stream_load.h"
 #include "http/action/stream_load_2pc.h"
@@ -218,6 +219,10 @@ Status HttpService::start() {
             new ReportAction(_env, TPrivilegeHier::GLOBAL, 
TPrivilegeType::ADMIN, "REPORT_TASK"));
     _ev_http_server->register_handler(HttpMethod::GET, "/api/report/task", 
report_task_action);
 
+    // shrink memory for starting co-exist process during upgrade
+    ShrinkMemAction* shrink_mem_action = _pool.add(new ShrinkMemAction());
+    _ev_http_server->register_handler(HttpMethod::GET, "/api/shrink_mem", 
shrink_mem_action);
+
     auto& engine = _env->storage_engine();
     if (config::is_cloud_mode()) {
         register_cloud_handler(engine.to_cloud());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
index 273ec422a51..72fd2ba353a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -103,7 +103,6 @@ public class CloudClusterChecker extends MasterDaemon {
                 ClusterStatus clusterStatus = 
remoteClusterIdToPB.get(addId).hasClusterStatus()
                         ? remoteClusterIdToPB.get(addId).getClusterStatus() : 
ClusterStatus.NORMAL;
                 MetricRepo.registerCloudMetrics(clusterId, clusterName);
-                //toAdd.forEach(i -> i.setTagMap(newTagMap));
                 List<Backend> toAdd = new ArrayList<>();
                 for (Cloud.NodeInfoPB node : 
remoteClusterIdToPB.get(addId).getNodesList()) {
                     String addr = Config.enable_fqdn_mode ? node.getHost() : 
node.getIp();
@@ -449,6 +448,8 @@ public class CloudClusterChecker extends MasterDaemon {
             // local - remote > 0, drop bes from local
             checkToDelCluster(remoteClusterIdToPB, localClusterIds, 
clusterIdToBackend);
 
+            clusterIdToBackend = 
cloudSystemInfoService.getCloudClusterIdToBackend();
+
             if (remoteClusterIdToPB.keySet().size() != 
clusterIdToBackend.keySet().size()) {
                 LOG.warn("impossible cluster id size not match, check it local 
{}, remote {}",
                         clusterIdToBackend, remoteClusterIdToPB);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 8095eeaeec7..30149de0d56 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -33,6 +33,7 @@ import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.system.Backend;
@@ -187,9 +188,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
         // 3 check whether the inflight preheating task has been completed
         checkInflghtWarmUpCacheAsync();
 
-        // TODO(merge-cloud): wait add cloud upgrade mgr
         // 4 migrate tablet for smooth upgrade
-        /*
         Pair<Long, Long> pair;
         statRouteInfo();
         while (!tabletsMigrateTasks.isEmpty()) {
@@ -201,7 +200,6 @@ public class CloudTabletRebalancer extends MasterDaemon {
             LOG.info("begin tablets migration from be {} to be {}", 
pair.first, pair.second);
             migrateTablets(pair.first, pair.second);
         }
-         */
 
         // 5 statistics be to tablets mapping information
         statRouteInfo();
@@ -923,14 +921,12 @@ public class CloudTabletRebalancer extends MasterDaemon {
             }
         }
 
-        // TODO(merge-cloud): wait add cloud upgrade mgr
-        /*
         try {
-            
Env.getCurrentEnv().getCloudUpgradeMgr().registerWaterShedTxnId(srcBe);
-        } catch (AnalysisException e) {
+            ((CloudEnv) 
Env.getCurrentEnv()).getCloudUpgradeMgr().registerWaterShedTxnId(srcBe);
+        } catch (UserException e) {
+            LOG.warn("registerWaterShedTxnId get exception", e);
             throw new RuntimeException(e);
         }
-         */
     }
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
index 1cb1aeb2568..58ae0256cd0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
@@ -92,6 +92,7 @@ public class CloudUpgradeMgr extends MasterDaemon {
                         isFinished = true;
                     }
                 } catch (AnalysisException e) {
+                    LOG.warn("cloud upgrade mgr exception", e);
                     throw new RuntimeException(e);
                 }
                 if (!isFinished) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index e764bf3cb6e..cb0a6f8a3a2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -19,6 +19,7 @@ package org.apache.doris.cloud.system;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.cloud.catalog.CloudEnv;
 import org.apache.doris.cloud.proto.Cloud;
 import org.apache.doris.cloud.proto.Cloud.ClusterPB;
 import org.apache.doris.cloud.proto.Cloud.InstanceInfoPB;
@@ -47,25 +48,29 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 public class CloudSystemInfoService extends SystemInfoService {
     private static final Logger LOG = 
LogManager.getLogger(CloudSystemInfoService.class);
 
-    // TODO(gavin): use {clusterId -> List<BackendId>} instead to reduce risk 
of inconsistency
-    // use exclusive lock to make sure only one thread can change 
clusterIdToBackend and clusterNameToId
-    protected ReentrantLock lock = new ReentrantLock();
+    // use rw lock to make sure only one thread can change clusterIdToBackend 
and clusterNameToId
+    private static final ReadWriteLock rwlock = new ReentrantReadWriteLock();
+
+    private static final Lock rlock = rwlock.readLock();
+
+    private static final Lock wlock = rwlock.writeLock();
 
     // for show cluster and cache user owned cluster
-    // mysqlUserName -> List of ClusterPB
-    private Map<String, List<ClusterPB>> mysqlUserNameToClusterPB = 
ImmutableMap.of();
     // clusterId -> List<Backend>
     protected Map<String, List<Backend>> clusterIdToBackend = new 
ConcurrentHashMap<>();
     // clusterName -> clusterId
@@ -104,8 +109,16 @@ public class CloudSystemInfoService extends 
SystemInfoService {
         }
     }
 
+    public void updateCloudBackends(List<Backend> toAdd, List<Backend> toDel) {
+        wlock.lock();
+        try {
+            updateCloudBackendsUnLock(toAdd, toDel);
+        } finally {
+            wlock.unlock();
+        }
+    }
 
-    public synchronized void updateCloudBackends(List<Backend> toAdd, 
List<Backend> toDel) {
+    public void updateCloudBackendsUnLock(List<Backend> toAdd, List<Backend> 
toDel) {
         // Deduplicate and validate
         if (toAdd.isEmpty() && toDel.isEmpty()) {
             LOG.info("nothing to do");
@@ -115,8 +128,7 @@ public class CloudSystemInfoService extends 
SystemInfoService {
                 .map(i -> i.getHost() + ":" + i.getHeartbeatPort())
                 .collect(Collectors.toSet());
         if (LOG.isDebugEnabled()) {
-            LOG.debug("deduplication existedBes={}", existedBes);
-            LOG.debug("before deduplication toAdd={} toDel={}", toAdd, toDel);
+            LOG.debug("deduplication existedBes={}, before deduplication 
toAdd={} toDel={}", existedBes, toAdd, toDel);
         }
         toAdd = toAdd.stream().filter(i -> !existedBes.contains(i.getHost() + 
":" + i.getHeartbeatPort()))
             .collect(Collectors.toList());
@@ -175,92 +187,94 @@ public class CloudSystemInfoService extends 
SystemInfoService {
         ImmutableMap<Long, AtomicLong> newIdToReportVersion = 
ImmutableMap.copyOf(copiedReportVersions);
         idToReportVersionRef = newIdToReportVersion;
 
-        updateCloudClusterMap(toAdd, toDel);
+        updateCloudClusterMapNoLock(toAdd, toDel);
     }
 
     private void handleNewBeOnSameNode(Backend oldBe, Backend newBe) {
         LOG.info("new BE {} starts on the same node as existing BE {}", 
newBe.getId(), oldBe.getId());
-        // TODO(merge-cloud): add it when has CloudTabletRebalancer.
-        // 
Env.getCurrentEnv().getCloudTabletRebalancer().addTabletMigrationTask(oldBe.getId(),
 newBe.getId());
+        ((CloudEnv) Env.getCurrentEnv()).getCloudTabletRebalancer()
+                .addTabletMigrationTask(oldBe.getId(), newBe.getId());
     }
 
     public void updateCloudClusterMap(List<Backend> toAdd, List<Backend> 
toDel) {
-        lock.lock();
+        wlock.lock();
         try {
-            Set<String> clusterNameSet = new HashSet<>();
-            for (Backend b : toAdd) {
-                String clusterName = b.getCloudClusterName();
-                String clusterId = b.getCloudClusterId();
-                if (clusterName.isEmpty() || clusterId.isEmpty()) {
-                    LOG.warn("cloud cluster name or id empty: id={}, name={}", 
clusterId, clusterName);
-                    continue;
-                }
-                clusterNameSet.add(clusterName);
-                if (clusterNameSet.size() != 1) {
-                    LOG.warn("toAdd be list have multi clusterName, please 
check, Set: {}", clusterNameSet);
-                }
+            updateCloudClusterMapNoLock(toAdd, toDel);
+        } finally {
+            wlock.unlock();
+        }
+    }
 
-                clusterNameToId.put(clusterName, clusterId);
-                List<Backend> be = clusterIdToBackend.get(clusterId);
-                if (be == null) {
-                    be = new ArrayList<>();
-                    clusterIdToBackend.put(clusterId, be);
-                    MetricRepo.registerCloudMetrics(clusterId, clusterName);
-                }
-                Set<String> existed = be.stream().map(i -> i.getHost() + ":" + 
i.getHeartbeatPort())
-                        .collect(Collectors.toSet());
-                // Deduplicate
-                // TODO(gavin): consider vpc
-                boolean alreadyExisted = existed.contains(b.getHost() + ":" + 
b.getHeartbeatPort());
-                if (alreadyExisted) {
-                    LOG.info("BE already existed, clusterName={} clusterId={} 
backendNum={} backend={}",
-                            clusterName, clusterId, be.size(), b);
-                    continue;
-                }
-                be.add(b);
-                LOG.info("update (add) cloud cluster map, clusterName={} 
clusterId={} backendNum={} current backend={}",
+    public void updateCloudClusterMapNoLock(List<Backend> toAdd, List<Backend> 
toDel) {
+        Set<String> clusterNameSet = new HashSet<>();
+        for (Backend b : toAdd) {
+            String clusterName = b.getCloudClusterName();
+            String clusterId = b.getCloudClusterId();
+            if (clusterName.isEmpty() || clusterId.isEmpty()) {
+                LOG.warn("cloud cluster name or id empty: id={}, name={}", 
clusterId, clusterName);
+                continue;
+            }
+            clusterNameSet.add(clusterName);
+            if (clusterNameSet.size() != 1) {
+                LOG.warn("toAdd be list have multi clusterName, please check, 
Set: {}", clusterNameSet);
+            }
+
+            clusterNameToId.put(clusterName, clusterId);
+            List<Backend> be = clusterIdToBackend.get(clusterId);
+            if (be == null) {
+                be = new ArrayList<>();
+                clusterIdToBackend.put(clusterId, be);
+                MetricRepo.registerCloudMetrics(clusterId, clusterName);
+            }
+            Set<String> existed = be.stream().map(i -> i.getHost() + ":" + 
i.getHeartbeatPort())
+                    .collect(Collectors.toSet());
+            // Deduplicate
+            boolean alreadyExisted = existed.contains(b.getHost() + ":" + 
b.getHeartbeatPort());
+            if (alreadyExisted) {
+                LOG.info("BE already existed, clusterName={} clusterId={} 
backendNum={} backend={}",
                         clusterName, clusterId, be.size(), b);
+                continue;
             }
+            be.add(b);
+            LOG.info("update (add) cloud cluster map, clusterName={} 
clusterId={} backendNum={} current backend={}",
+                    clusterName, clusterId, be.size(), b);
+        }
 
-            for (Backend b : toDel) {
-                String clusterName = b.getCloudClusterName();
-                String clusterId = b.getCloudClusterId();
-                // We actually don't care about cluster name here
-                if (clusterName.isEmpty() || clusterId.isEmpty()) {
-                    LOG.warn("cloud cluster name or id empty: id={}, name={}", 
clusterId, clusterName);
-                    continue;
-                }
-                List<Backend> be = clusterIdToBackend.get(clusterId);
-                if (be == null) {
-                    LOG.warn("try to remove a non-existing cluster, 
clusterId={} clusterName={}",
-                            clusterId, clusterName);
-                    continue;
-                }
-                Set<Long> d = toDel.stream().map(i -> 
i.getId()).collect(Collectors.toSet());
-                be = be.stream().filter(i -> 
!d.contains(i.getId())).collect(Collectors.toList());
-                // ATTN: clusterId may have zero nodes
-                clusterIdToBackend.replace(clusterId, be);
-                // such as dropCluster, but no lock
-                // ATTN: Empty clusters are treated as dropped clusters.
-                if (be.size() == 0) {
-                    LOG.info("del clusterId {} and clusterName {} due to be 
nodes eq 0", clusterId, clusterName);
-                    boolean succ = clusterNameToId.remove(clusterName, 
clusterId);
-                    if (!succ) {
-                        LOG.warn("impossible, somewhere err, clusterNameToId 
{}, "
-                                + "want remove cluster name {}, cluster id {}",
-                                clusterNameToId, clusterName, clusterId);
-                    }
-                    clusterIdToBackend.remove(clusterId);
+        for (Backend b : toDel) {
+            String clusterName = b.getCloudClusterName();
+            String clusterId = b.getCloudClusterId();
+            // We actually don't care about cluster name here
+            if (clusterName.isEmpty() || clusterId.isEmpty()) {
+                LOG.warn("cloud cluster name or id empty: id={}, name={}", 
clusterId, clusterName);
+                continue;
+            }
+            List<Backend> be = clusterIdToBackend.get(clusterId);
+            if (be == null) {
+                LOG.warn("try to remove a non-existing cluster, clusterId={} 
clusterName={}",
+                        clusterId, clusterName);
+                continue;
+            }
+            Set<Long> d = 
toDel.stream().map(Backend::getId).collect(Collectors.toSet());
+            be = be.stream().filter(i -> 
!d.contains(i.getId())).collect(Collectors.toList());
+            // ATTN: clusterId may have zero nodes
+            clusterIdToBackend.replace(clusterId, be);
+            // such as dropCluster, but no lock
+            // ATTN: Empty clusters are treated as dropped clusters.
+            if (be.isEmpty()) {
+                LOG.info("del clusterId {} and clusterName {} due to be nodes 
eq 0", clusterId, clusterName);
+                boolean succ = clusterNameToId.remove(clusterName, clusterId);
+                if (!succ) {
+                    LOG.warn("impossible, somewhere err, clusterNameToId {}, "
+                            + "want remove cluster name {}, cluster id {}",
+                            clusterNameToId, clusterName, clusterId);
                 }
-                LOG.info("update (del) cloud cluster map, clusterName={} 
clusterId={} backendNum={} current backend={}",
-                        clusterName, clusterId, be.size(), b);
+                clusterIdToBackend.remove(clusterId);
             }
-        } finally {
-            lock.unlock();
+            LOG.info("update (del) cloud cluster map, clusterName={} 
clusterId={} backendNum={} current backend={}",
+                    clusterName, clusterId, be.size(), b);
         }
     }
 
-
     public synchronized void updateFrontends(List<Frontend> toAdd, 
List<Frontend> toDel)
             throws DdlException {
         if (LOG.isDebugEnabled()) {
@@ -321,15 +335,26 @@ public class CloudSystemInfoService extends 
SystemInfoService {
         if (FeConstants.runningUnitTest) {
             return true;
         }
-        if (null == clusterNameToId || clusterNameToId.isEmpty()) {
-            return false;
+        rlock.lock();
+        try {
+            if (null == clusterNameToId || clusterNameToId.isEmpty()) {
+                return false;
+            }
+
+            return clusterIdToBackend != null && !clusterIdToBackend.isEmpty()
+                && clusterIdToBackend.values().stream().anyMatch(list -> list 
!= null && !list.isEmpty());
+        } finally {
+            rlock.unlock();
         }
-        return clusterIdToBackend != null && !clusterIdToBackend.isEmpty()
-            && clusterIdToBackend.values().stream().anyMatch(list -> list != 
null && !list.isEmpty());
     }
 
     public boolean containClusterName(String clusterName) {
-        return clusterNameToId.containsKey(clusterName);
+        rlock.lock();
+        try {
+            return clusterNameToId.containsKey(clusterName);
+        } finally {
+            rlock.unlock();
+        }
     }
 
     @Override
@@ -370,57 +395,98 @@ public class CloudSystemInfoService extends 
SystemInfoService {
     }
 
     public List<Backend> getBackendsByClusterName(final String clusterName) {
-        String clusterId = clusterNameToId.getOrDefault(clusterName, "");
-        if (clusterId.isEmpty()) {
-            return new ArrayList<>();
+        rlock.lock();
+        try {
+            String clusterId = clusterNameToId.getOrDefault(clusterName, "");
+            if (clusterId.isEmpty()) {
+                return new ArrayList<>();
+            }
+            // copy a new List
+            return new ArrayList<>(clusterIdToBackend.getOrDefault(clusterId, 
new ArrayList<>()));
+        } finally {
+            rlock.unlock();
         }
-        return clusterIdToBackend.get(clusterId);
     }
 
     public List<Backend> getBackendsByClusterId(final String clusterId) {
-        return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>());
+        rlock.lock();
+        try {
+            // copy a new List
+            return new ArrayList<>(clusterIdToBackend.getOrDefault(clusterId, 
new ArrayList<>()));
+        } finally {
+            rlock.unlock();
+        }
     }
 
     public String getClusterIdByBeAddr(String beEndpoint) {
-        for (Map.Entry<String, List<Backend>> idBe : 
clusterIdToBackend.entrySet()) {
-            if (idBe.getValue().stream().anyMatch(be -> 
be.getAddress().equals(beEndpoint))) {
-                return getClusterNameByClusterId(idBe.getKey());
+        rlock.lock();
+        try {
+            for (Map.Entry<String, List<Backend>> idBe : 
clusterIdToBackend.entrySet()) {
+                if (idBe.getValue().stream().anyMatch(be -> 
be.getAddress().equals(beEndpoint))) {
+                    return getClusterNameByClusterIdNoLock(idBe.getKey());
+                }
             }
+            return null;
+        } finally {
+            rlock.unlock();
         }
-        return null;
     }
 
     public List<String> getCloudClusterIds() {
-        return new ArrayList<>(clusterIdToBackend.keySet());
+        rlock.lock();
+        try {
+            return new ArrayList<>(clusterIdToBackend.keySet());
+        } finally {
+            rlock.unlock();
+        }
     }
 
     public String getCloudStatusByName(final String clusterName) {
-        String clusterId = clusterNameToId.getOrDefault(clusterName, "");
-        if (Strings.isNullOrEmpty(clusterId)) {
-            // for rename cluster or dropped cluster
-            LOG.warn("cant find clusterId by clusterName {}", clusterName);
-            return "";
+        rlock.lock();
+        try {
+            String clusterId = clusterNameToId.getOrDefault(clusterName, "");
+            if (Strings.isNullOrEmpty(clusterId)) {
+                // for rename cluster or dropped cluster
+                LOG.warn("cant find clusterId by clusteName {}", clusterName);
+                return "";
+            }
+            return getCloudStatusById(clusterId);
+        } finally {
+            rlock.unlock();
         }
-        return getCloudStatusById(clusterId);
     }
 
     public String getCloudStatusById(final String clusterId) {
-        return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>())
-            
.stream().map(Backend::getCloudClusterStatus).findFirst().orElse("");
+        rlock.lock();
+        try {
+            return clusterIdToBackend.getOrDefault(clusterId, new 
ArrayList<>())
+                
.stream().map(Backend::getCloudClusterStatus).findFirst().orElse("");
+        } finally {
+            rlock.unlock();
+        }
     }
 
     public void updateClusterNameToId(final String newName,
                                       final String originalName, final String 
clusterId) {
-        lock.lock();
+        wlock.lock();
         try {
             clusterNameToId.remove(originalName);
             clusterNameToId.put(newName, clusterId);
         } finally {
-            lock.unlock();
+            wlock.unlock();
         }
     }
 
     public String getClusterNameByClusterId(final String clusterId) {
+        rlock.lock();
+        try {
+            return getClusterNameByClusterIdNoLock(clusterId);
+        } finally {
+            rlock.unlock();
+        }
+    }
+
+    public String getClusterNameByClusterIdNoLock(final String clusterId) {
         String clusterName = "";
         for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) {
             if (entry.getValue().equals(clusterId)) {
@@ -432,55 +498,64 @@ public class CloudSystemInfoService extends 
SystemInfoService {
     }
 
     public void dropCluster(final String clusterId, final String clusterName) {
-        lock.lock();
+        wlock.lock();
         try {
             clusterNameToId.remove(clusterName, clusterId);
             clusterIdToBackend.remove(clusterId);
         } finally {
-            lock.unlock();
+            wlock.unlock();
         }
     }
 
     public List<String> getCloudClusterNames() {
-        return new ArrayList<>(clusterNameToId.keySet());
+        rlock.lock();
+        try {
+            return new ArrayList<>(clusterNameToId.keySet()).stream().filter(c 
-> !Strings.isNullOrEmpty(c))
+                
.sorted(Comparator.naturalOrder()).collect(Collectors.toList());
+        } finally {
+            rlock.unlock();
+        }
     }
 
     // use cluster $clusterName
     // return clusterName for userName
     public String addCloudCluster(final String clusterName, final String 
userName) throws UserException {
-        lock.lock();
         if ((Strings.isNullOrEmpty(clusterName) && 
Strings.isNullOrEmpty(userName))
                 || (!Strings.isNullOrEmpty(clusterName) && 
!Strings.isNullOrEmpty(userName))) {
             // clusterName or userName just only need one.
-            lock.unlock();
             LOG.warn("addCloudCluster args err clusterName {}, userName {}", 
clusterName, userName);
             return "";
         }
-        // First time this method is called, build cloud cluster map
-        if (clusterNameToId.isEmpty() || clusterIdToBackend.isEmpty()) {
-            List<Backend> toAdd = Maps.newHashMap(idToBackendRef)
-                    .values().stream()
-                    .filter(i -> 
i.getTagMap().containsKey(Tag.CLOUD_CLUSTER_ID))
-                    .filter(i -> 
i.getTagMap().containsKey(Tag.CLOUD_CLUSTER_NAME))
-                    .collect(Collectors.toList());
-            // The larger bakendId the later it was added, the order matters
-            toAdd.sort((x, y) -> (int) (x.getId() - y.getId()));
-            updateCloudClusterMap(toAdd, new ArrayList<>());
-        }
 
         String clusterId;
-        if (Strings.isNullOrEmpty(userName)) {
-            // use clusterName
-            LOG.info("try to add a cloud cluster, clusterName={}", 
clusterName);
-            clusterId = clusterNameToId.get(clusterName);
-            clusterId = clusterId == null ? "" : clusterId;
-            if (clusterIdToBackend.containsKey(clusterId)) { // Cluster 
already added
-                lock.unlock();
-                LOG.info("cloud cluster already added, clusterName={}, 
clusterId={}", clusterName, clusterId);
-                return "";
+        wlock.lock();
+        try {
+            // First time this method is called, build cloud cluster map
+            if (clusterNameToId.isEmpty() || clusterIdToBackend.isEmpty()) {
+                List<Backend> toAdd = Maps.newHashMap(idToBackendRef)
+                        .values().stream()
+                        .filter(i -> 
i.getTagMap().containsKey(Tag.CLOUD_CLUSTER_ID))
+                        .filter(i -> 
i.getTagMap().containsKey(Tag.CLOUD_CLUSTER_NAME))
+                        .collect(Collectors.toList());
+                // The larger bakendId the later it was added, the order 
matters
+                toAdd.sort((x, y) -> (int) (x.getId() - y.getId()));
+                updateCloudClusterMapNoLock(toAdd, new ArrayList<>());
+            }
+
+            if (Strings.isNullOrEmpty(userName)) {
+                // use clusterName
+                LOG.info("try to add a cloud cluster, clusterName={}", 
clusterName);
+                clusterId = clusterNameToId.get(clusterName);
+                clusterId = clusterId == null ? "" : clusterId;
+                if (clusterIdToBackend.containsKey(clusterId)) { // Cluster 
already added
+                    LOG.info("cloud cluster already added, clusterName={}, 
clusterId={}", clusterName, clusterId);
+                    return "";
+                }
             }
+        } finally {
+            wlock.unlock();
         }
-        lock.unlock();
+
         LOG.info("begin to get cloud cluster from remote, clusterName={}, 
userName={}", clusterName, userName);
 
         // Get cloud cluster info from resource manager
@@ -505,11 +580,11 @@ public class CloudSystemInfoService extends 
SystemInfoService {
         String clusterNameMeta = cpb.getClusterName();
 
         // Prepare backends
-        Map<String, String> newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap();
-        newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterNameMeta);
-        newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
         List<Backend> backends = new ArrayList<>();
         for (Cloud.NodeInfoPB node : cpb.getNodesList()) {
+            Map<String, String> newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap();
+            newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterNameMeta);
+            newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
             Backend b = new Backend(Env.getCurrentEnv().getNextId(), 
node.getIp(), node.getHeartbeatPort());
             b.setTagMap(newTagMap);
             backends.add(b);
@@ -521,19 +596,35 @@ public class CloudSystemInfoService extends 
SystemInfoService {
         return clusterNameMeta;
     }
 
-
     // Return the ref of concurrentMap clusterIdToBackend
-    // It should be thread-safe to iterate.
-    // reference: 
https://stackoverflow.com/questions/3768554/is-iterating-concurrenthashmap-values-thread-safe
     public Map<String, List<Backend>> getCloudClusterIdToBackend() {
-        return clusterIdToBackend;
+        rlock.lock();
+        try {
+            return new ConcurrentHashMap<>(clusterIdToBackend);
+        } finally {
+            rlock.unlock();
+        }
     }
 
     public String getCloudClusterIdByName(String clusterName) {
-        return clusterNameToId.get(clusterName);
+        rlock.lock();
+        try {
+            return clusterNameToId.get(clusterName);
+        } finally {
+            rlock.unlock();
+        }
     }
 
     public ImmutableMap<Long, Backend> getCloudIdToBackend(String clusterName) 
{
+        rlock.lock();
+        try {
+            return getCloudIdToBackendNoLock(clusterName);
+        } finally {
+            rlock.unlock();
+        }
+    }
+
+    public ImmutableMap<Long, Backend> getCloudIdToBackendNoLock(String 
clusterName) {
         String clusterId = clusterNameToId.get(clusterName);
         if (Strings.isNullOrEmpty(clusterId)) {
             LOG.warn("cant find clusterId, this cluster may be has been 
dropped, clusterName={}", clusterName);
@@ -548,18 +639,13 @@ public class CloudSystemInfoService extends 
SystemInfoService {
     }
 
     // Return the ref of concurrentMap clusterNameToId
-    // It should be thread-safe to iterate.
-    // reference: 
https://stackoverflow.com/questions/3768554/is-iterating-concurrenthashmap-values-thread-safe
     public Map<String, String> getCloudClusterNameToId() {
-        return clusterNameToId;
-    }
-
-    public Map<String, List<ClusterPB>> getMysqlUserNameToClusterPb() {
-        return mysqlUserNameToClusterPB;
-    }
-
-    public void updateMysqlUserNameToClusterPb(Map<String, List<ClusterPB>> m) 
{
-        mysqlUserNameToClusterPB = m;
+        rlock.lock();
+        try {
+            return new ConcurrentHashMap<>(clusterNameToId);
+        } finally {
+            rlock.unlock();
+        }
     }
 
     public List<Pair<String, Integer>> getCurrentObFrontends() {
@@ -724,6 +810,4 @@ public class CloudSystemInfoService extends 
SystemInfoService {
             LOG.info("auto start cluster {}, start cost {} ms", clusterName, 
stopWatch.getTime());
         }
     }
-
-
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index b5076466d2b..646d0235b5c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -47,6 +47,7 @@ import org.apache.doris.catalog.TableAttributes;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.catalog.VariantType;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
@@ -481,6 +482,9 @@ public class StatisticsUtil {
             return false;
         }
         if (Config.isCloudMode()) {
+            if (!((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).availableBackendsExists()) {
+                return false;
+            }
             try (AutoCloseConnectContext r = buildConnectContext()) {
                 r.connectContext.getCloudCluster();
                 for (OlapTable table : statsTbls) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to