This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 57593c0c77e7c5d55f75b16597f465b9d00563fd Author: deardeng <[email protected]> AuthorDate: Thu Jun 13 20:16:19 2024 +0800 (cloud-merge) Support BE smooth upgrade (#35819) --- be/src/http/action/shrink_mem_action.cpp | 40 +++ be/src/http/action/shrink_mem_action.h | 32 ++ be/src/service/http_service.cpp | 5 + .../doris/cloud/catalog/CloudClusterChecker.java | 3 +- .../doris/cloud/catalog/CloudTabletRebalancer.java | 12 +- .../doris/cloud/catalog/CloudUpgradeMgr.java | 1 + .../doris/cloud/system/CloudSystemInfoService.java | 384 +++++++++++++-------- .../doris/statistics/util/StatisticsUtil.java | 4 + 8 files changed, 322 insertions(+), 159 deletions(-) diff --git a/be/src/http/action/shrink_mem_action.cpp b/be/src/http/action/shrink_mem_action.cpp new file mode 100644 index 00000000000..547f08ee82d --- /dev/null +++ b/be/src/http/action/shrink_mem_action.cpp @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http/action/shrink_mem_action.h" + +#include <fmt/core.h> + +#include "http/http_channel.h" +#include "http/http_request.h" +#include "runtime/exec_env.h" +#include "util/brpc_client_cache.h" +#include "util/mem_info.h" +#include "util/string_util.h" + +namespace doris { +void ShrinkMemAction::handle(HttpRequest* req) { + LOG(INFO) << "begin shrink memory"; + /* this interface might be ready for cloud in the near future + * int freed_mem = 0; + * doris::MemInfo::process_cache_gc(&freed_mem); */ + MemInfo::process_minor_gc(); + LOG(INFO) << "shrink memory triggered, using Process Minor GC Free Memory"; + HttpChannel::send_reply(req, HttpStatus::OK, "shrinking"); +} + +} // namespace doris diff --git a/be/src/http/action/shrink_mem_action.h b/be/src/http/action/shrink_mem_action.h new file mode 100644 index 00000000000..2ffae60327f --- /dev/null +++ b/be/src/http/action/shrink_mem_action.h @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "http/http_handler.h" + +namespace doris { +class ExecEnv; +class ShrinkMemAction : public HttpHandler { +public: + explicit ShrinkMemAction() {} + + virtual ~ShrinkMemAction() {} + + void handle(HttpRequest* req) override; +}; +} // namespace doris diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 1cbca7bacf7..ba56d2cb857 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -54,6 +54,7 @@ #include "http/action/reset_rpc_channel_action.h" #include "http/action/restore_tablet_action.h" #include "http/action/show_hotspot_action.h" +#include "http/action/shrink_mem_action.h" #include "http/action/snapshot_action.h" #include "http/action/stream_load.h" #include "http/action/stream_load_2pc.h" @@ -218,6 +219,10 @@ Status HttpService::start() { new ReportAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, "REPORT_TASK")); _ev_http_server->register_handler(HttpMethod::GET, "/api/report/task", report_task_action); + // shrink memory for starting co-exist process during upgrade + ShrinkMemAction* shrink_mem_action = _pool.add(new ShrinkMemAction()); + _ev_http_server->register_handler(HttpMethod::GET, "/api/shrink_mem", shrink_mem_action); + auto& engine = _env->storage_engine(); if (config::is_cloud_mode()) { register_cloud_handler(engine.to_cloud()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java index 273ec422a51..72fd2ba353a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java @@ -103,7 +103,6 @@ public class CloudClusterChecker extends MasterDaemon { ClusterStatus clusterStatus = remoteClusterIdToPB.get(addId).hasClusterStatus() ? remoteClusterIdToPB.get(addId).getClusterStatus() : ClusterStatus.NORMAL; MetricRepo.registerCloudMetrics(clusterId, clusterName); - //toAdd.forEach(i -> i.setTagMap(newTagMap)); List<Backend> toAdd = new ArrayList<>(); for (Cloud.NodeInfoPB node : remoteClusterIdToPB.get(addId).getNodesList()) { String addr = Config.enable_fqdn_mode ? node.getHost() : node.getIp(); @@ -449,6 +448,8 @@ public class CloudClusterChecker extends MasterDaemon { // local - remote > 0, drop bes from local checkToDelCluster(remoteClusterIdToPB, localClusterIds, clusterIdToBackend); + clusterIdToBackend = cloudSystemInfoService.getCloudClusterIdToBackend(); + if (remoteClusterIdToPB.keySet().size() != clusterIdToBackend.keySet().size()) { LOG.warn("impossible cluster id size not match, check it local {}, remote {}", clusterIdToBackend, remoteClusterIdToPB); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index 8095eeaeec7..30149de0d56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -33,6 +33,7 @@ import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.rpc.RpcException; import org.apache.doris.system.Backend; @@ -187,9 +188,7 @@ public class CloudTabletRebalancer extends MasterDaemon { // 3 check whether the inflight preheating task has been completed checkInflghtWarmUpCacheAsync(); - // TODO(merge-cloud): wait add cloud upgrade mgr // 4 migrate tablet for smooth upgrade - /* Pair<Long, Long> pair; statRouteInfo(); while (!tabletsMigrateTasks.isEmpty()) { @@ -201,7 +200,6 @@ public class CloudTabletRebalancer extends MasterDaemon { LOG.info("begin tablets migration from be {} to be {}", pair.first, pair.second); migrateTablets(pair.first, pair.second); } - */ // 5 statistics be to tablets mapping information statRouteInfo(); @@ -923,14 +921,12 @@ public class CloudTabletRebalancer extends MasterDaemon { } } - // TODO(merge-cloud): wait add cloud upgrade mgr - /* try { - Env.getCurrentEnv().getCloudUpgradeMgr().registerWaterShedTxnId(srcBe); - } catch (AnalysisException e) { + ((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().registerWaterShedTxnId(srcBe); + } catch (UserException e) { + LOG.warn("registerWaterShedTxnId get exception", e); throw new RuntimeException(e); } - */ } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java index 1cb1aeb2568..58ae0256cd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java @@ -92,6 +92,7 @@ public class CloudUpgradeMgr extends MasterDaemon { isFinished = true; } } catch (AnalysisException e) { + LOG.warn("cloud upgrade mgr exception", e); throw new RuntimeException(e); } if (!isFinished) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java index e764bf3cb6e..cb0a6f8a3a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java @@ -19,6 +19,7 @@ package org.apache.doris.cloud.system; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ReplicaAllocation; +import org.apache.doris.cloud.catalog.CloudEnv; import org.apache.doris.cloud.proto.Cloud; import org.apache.doris.cloud.proto.Cloud.ClusterPB; import org.apache.doris.cloud.proto.Cloud.InstanceInfoPB; @@ -47,25 +48,29 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; public class CloudSystemInfoService extends SystemInfoService { private static final Logger LOG = LogManager.getLogger(CloudSystemInfoService.class); - // TODO(gavin): use {clusterId -> List<BackendId>} instead to reduce risk of inconsistency - // use exclusive lock to make sure only one thread can change clusterIdToBackend and clusterNameToId - protected ReentrantLock lock = new ReentrantLock(); + // use rw lock to make sure only one thread can change clusterIdToBackend and clusterNameToId + private static final ReadWriteLock rwlock = new ReentrantReadWriteLock(); + + private static final Lock rlock = rwlock.readLock(); + + private static final Lock wlock = rwlock.writeLock(); // for show cluster and cache user owned cluster - // mysqlUserName -> List of ClusterPB - private Map<String, List<ClusterPB>> mysqlUserNameToClusterPB = ImmutableMap.of(); // clusterId -> List<Backend> protected Map<String, List<Backend>> clusterIdToBackend = new ConcurrentHashMap<>(); // clusterName -> clusterId @@ -104,8 +109,16 @@ public class CloudSystemInfoService extends SystemInfoService { } } + public void updateCloudBackends(List<Backend> toAdd, List<Backend> toDel) { + wlock.lock(); + try { + updateCloudBackendsUnLock(toAdd, toDel); + } finally { + wlock.unlock(); + } + } - public synchronized void updateCloudBackends(List<Backend> toAdd, List<Backend> toDel) { + public void updateCloudBackendsUnLock(List<Backend> toAdd, List<Backend> toDel) { // Deduplicate and validate if (toAdd.isEmpty() && toDel.isEmpty()) { LOG.info("nothing to do"); @@ -115,8 +128,7 @@ public class CloudSystemInfoService extends SystemInfoService { .map(i -> i.getHost() + ":" + i.getHeartbeatPort()) .collect(Collectors.toSet()); if (LOG.isDebugEnabled()) { - LOG.debug("deduplication existedBes={}", existedBes); - LOG.debug("before deduplication toAdd={} toDel={}", toAdd, toDel); + LOG.debug("deduplication existedBes={}, before deduplication toAdd={} toDel={}", existedBes, toAdd, toDel); } toAdd = toAdd.stream().filter(i -> !existedBes.contains(i.getHost() + ":" + i.getHeartbeatPort())) .collect(Collectors.toList()); @@ -175,92 +187,94 @@ public class CloudSystemInfoService extends SystemInfoService { ImmutableMap<Long, AtomicLong> newIdToReportVersion = ImmutableMap.copyOf(copiedReportVersions); idToReportVersionRef = newIdToReportVersion; - updateCloudClusterMap(toAdd, toDel); + updateCloudClusterMapNoLock(toAdd, toDel); } private void handleNewBeOnSameNode(Backend oldBe, Backend newBe) { LOG.info("new BE {} starts on the same node as existing BE {}", newBe.getId(), oldBe.getId()); - // TODO(merge-cloud): add it when has CloudTabletRebalancer. - // Env.getCurrentEnv().getCloudTabletRebalancer().addTabletMigrationTask(oldBe.getId(), newBe.getId()); + ((CloudEnv) Env.getCurrentEnv()).getCloudTabletRebalancer() + .addTabletMigrationTask(oldBe.getId(), newBe.getId()); } public void updateCloudClusterMap(List<Backend> toAdd, List<Backend> toDel) { - lock.lock(); + wlock.lock(); try { - Set<String> clusterNameSet = new HashSet<>(); - for (Backend b : toAdd) { - String clusterName = b.getCloudClusterName(); - String clusterId = b.getCloudClusterId(); - if (clusterName.isEmpty() || clusterId.isEmpty()) { - LOG.warn("cloud cluster name or id empty: id={}, name={}", clusterId, clusterName); - continue; - } - clusterNameSet.add(clusterName); - if (clusterNameSet.size() != 1) { - LOG.warn("toAdd be list have multi clusterName, please check, Set: {}", clusterNameSet); - } + updateCloudClusterMapNoLock(toAdd, toDel); + } finally { + wlock.unlock(); + } + } - clusterNameToId.put(clusterName, clusterId); - List<Backend> be = clusterIdToBackend.get(clusterId); - if (be == null) { - be = new ArrayList<>(); - clusterIdToBackend.put(clusterId, be); - MetricRepo.registerCloudMetrics(clusterId, clusterName); - } - Set<String> existed = be.stream().map(i -> i.getHost() + ":" + i.getHeartbeatPort()) - .collect(Collectors.toSet()); - // Deduplicate - // TODO(gavin): consider vpc - boolean alreadyExisted = existed.contains(b.getHost() + ":" + b.getHeartbeatPort()); - if (alreadyExisted) { - LOG.info("BE already existed, clusterName={} clusterId={} backendNum={} backend={}", - clusterName, clusterId, be.size(), b); - continue; - } - be.add(b); - LOG.info("update (add) cloud cluster map, clusterName={} clusterId={} backendNum={} current backend={}", + public void updateCloudClusterMapNoLock(List<Backend> toAdd, List<Backend> toDel) { + Set<String> clusterNameSet = new HashSet<>(); + for (Backend b : toAdd) { + String clusterName = b.getCloudClusterName(); + String clusterId = b.getCloudClusterId(); + if (clusterName.isEmpty() || clusterId.isEmpty()) { + LOG.warn("cloud cluster name or id empty: id={}, name={}", clusterId, clusterName); + continue; + } + clusterNameSet.add(clusterName); + if (clusterNameSet.size() != 1) { + LOG.warn("toAdd be list have multi clusterName, please check, Set: {}", clusterNameSet); + } + + clusterNameToId.put(clusterName, clusterId); + List<Backend> be = clusterIdToBackend.get(clusterId); + if (be == null) { + be = new ArrayList<>(); + clusterIdToBackend.put(clusterId, be); + MetricRepo.registerCloudMetrics(clusterId, clusterName); + } + Set<String> existed = be.stream().map(i -> i.getHost() + ":" + i.getHeartbeatPort()) + .collect(Collectors.toSet()); + // Deduplicate + boolean alreadyExisted = existed.contains(b.getHost() + ":" + b.getHeartbeatPort()); + if (alreadyExisted) { + LOG.info("BE already existed, clusterName={} clusterId={} backendNum={} backend={}", clusterName, clusterId, be.size(), b); + continue; } + be.add(b); + LOG.info("update (add) cloud cluster map, clusterName={} clusterId={} backendNum={} current backend={}", + clusterName, clusterId, be.size(), b); + } - for (Backend b : toDel) { - String clusterName = b.getCloudClusterName(); - String clusterId = b.getCloudClusterId(); - // We actually don't care about cluster name here - if (clusterName.isEmpty() || clusterId.isEmpty()) { - LOG.warn("cloud cluster name or id empty: id={}, name={}", clusterId, clusterName); - continue; - } - List<Backend> be = clusterIdToBackend.get(clusterId); - if (be == null) { - LOG.warn("try to remove a non-existing cluster, clusterId={} clusterName={}", - clusterId, clusterName); - continue; - } - Set<Long> d = toDel.stream().map(i -> i.getId()).collect(Collectors.toSet()); - be = be.stream().filter(i -> !d.contains(i.getId())).collect(Collectors.toList()); - // ATTN: clusterId may have zero nodes - clusterIdToBackend.replace(clusterId, be); - // such as dropCluster, but no lock - // ATTN: Empty clusters are treated as dropped clusters. - if (be.size() == 0) { - LOG.info("del clusterId {} and clusterName {} due to be nodes eq 0", clusterId, clusterName); - boolean succ = clusterNameToId.remove(clusterName, clusterId); - if (!succ) { - LOG.warn("impossible, somewhere err, clusterNameToId {}, " - + "want remove cluster name {}, cluster id {}", - clusterNameToId, clusterName, clusterId); - } - clusterIdToBackend.remove(clusterId); + for (Backend b : toDel) { + String clusterName = b.getCloudClusterName(); + String clusterId = b.getCloudClusterId(); + // We actually don't care about cluster name here + if (clusterName.isEmpty() || clusterId.isEmpty()) { + LOG.warn("cloud cluster name or id empty: id={}, name={}", clusterId, clusterName); + continue; + } + List<Backend> be = clusterIdToBackend.get(clusterId); + if (be == null) { + LOG.warn("try to remove a non-existing cluster, clusterId={} clusterName={}", + clusterId, clusterName); + continue; + } + Set<Long> d = toDel.stream().map(Backend::getId).collect(Collectors.toSet()); + be = be.stream().filter(i -> !d.contains(i.getId())).collect(Collectors.toList()); + // ATTN: clusterId may have zero nodes + clusterIdToBackend.replace(clusterId, be); + // such as dropCluster, but no lock + // ATTN: Empty clusters are treated as dropped clusters. + if (be.isEmpty()) { + LOG.info("del clusterId {} and clusterName {} due to be nodes eq 0", clusterId, clusterName); + boolean succ = clusterNameToId.remove(clusterName, clusterId); + if (!succ) { + LOG.warn("impossible, somewhere err, clusterNameToId {}, " + + "want remove cluster name {}, cluster id {}", + clusterNameToId, clusterName, clusterId); } - LOG.info("update (del) cloud cluster map, clusterName={} clusterId={} backendNum={} current backend={}", - clusterName, clusterId, be.size(), b); + clusterIdToBackend.remove(clusterId); } - } finally { - lock.unlock(); + LOG.info("update (del) cloud cluster map, clusterName={} clusterId={} backendNum={} current backend={}", + clusterName, clusterId, be.size(), b); } } - public synchronized void updateFrontends(List<Frontend> toAdd, List<Frontend> toDel) throws DdlException { if (LOG.isDebugEnabled()) { @@ -321,15 +335,26 @@ public class CloudSystemInfoService extends SystemInfoService { if (FeConstants.runningUnitTest) { return true; } - if (null == clusterNameToId || clusterNameToId.isEmpty()) { - return false; + rlock.lock(); + try { + if (null == clusterNameToId || clusterNameToId.isEmpty()) { + return false; + } + + return clusterIdToBackend != null && !clusterIdToBackend.isEmpty() + && clusterIdToBackend.values().stream().anyMatch(list -> list != null && !list.isEmpty()); + } finally { + rlock.unlock(); } - return clusterIdToBackend != null && !clusterIdToBackend.isEmpty() - && clusterIdToBackend.values().stream().anyMatch(list -> list != null && !list.isEmpty()); } public boolean containClusterName(String clusterName) { - return clusterNameToId.containsKey(clusterName); + rlock.lock(); + try { + return clusterNameToId.containsKey(clusterName); + } finally { + rlock.unlock(); + } } @Override @@ -370,57 +395,98 @@ public class CloudSystemInfoService extends SystemInfoService { } public List<Backend> getBackendsByClusterName(final String clusterName) { - String clusterId = clusterNameToId.getOrDefault(clusterName, ""); - if (clusterId.isEmpty()) { - return new ArrayList<>(); + rlock.lock(); + try { + String clusterId = clusterNameToId.getOrDefault(clusterName, ""); + if (clusterId.isEmpty()) { + return new ArrayList<>(); + } + // copy a new List + return new ArrayList<>(clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>())); + } finally { + rlock.unlock(); } - return clusterIdToBackend.get(clusterId); } public List<Backend> getBackendsByClusterId(final String clusterId) { - return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>()); + rlock.lock(); + try { + // copy a new List + return new ArrayList<>(clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>())); + } finally { + rlock.unlock(); + } } public String getClusterIdByBeAddr(String beEndpoint) { - for (Map.Entry<String, List<Backend>> idBe : clusterIdToBackend.entrySet()) { - if (idBe.getValue().stream().anyMatch(be -> be.getAddress().equals(beEndpoint))) { - return getClusterNameByClusterId(idBe.getKey()); + rlock.lock(); + try { + for (Map.Entry<String, List<Backend>> idBe : clusterIdToBackend.entrySet()) { + if (idBe.getValue().stream().anyMatch(be -> be.getAddress().equals(beEndpoint))) { + return getClusterNameByClusterIdNoLock(idBe.getKey()); + } } + return null; + } finally { + rlock.unlock(); } - return null; } public List<String> getCloudClusterIds() { - return new ArrayList<>(clusterIdToBackend.keySet()); + rlock.lock(); + try { + return new ArrayList<>(clusterIdToBackend.keySet()); + } finally { + rlock.unlock(); + } } public String getCloudStatusByName(final String clusterName) { - String clusterId = clusterNameToId.getOrDefault(clusterName, ""); - if (Strings.isNullOrEmpty(clusterId)) { - // for rename cluster or dropped cluster - LOG.warn("cant find clusterId by clusterName {}", clusterName); - return ""; + rlock.lock(); + try { + String clusterId = clusterNameToId.getOrDefault(clusterName, ""); + if (Strings.isNullOrEmpty(clusterId)) { + // for rename cluster or dropped cluster + LOG.warn("cant find clusterId by clusteName {}", clusterName); + return ""; + } + return getCloudStatusById(clusterId); + } finally { + rlock.unlock(); } - return getCloudStatusById(clusterId); } public String getCloudStatusById(final String clusterId) { - return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>()) - .stream().map(Backend::getCloudClusterStatus).findFirst().orElse(""); + rlock.lock(); + try { + return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>()) + .stream().map(Backend::getCloudClusterStatus).findFirst().orElse(""); + } finally { + rlock.unlock(); + } } public void updateClusterNameToId(final String newName, final String originalName, final String clusterId) { - lock.lock(); + wlock.lock(); try { clusterNameToId.remove(originalName); clusterNameToId.put(newName, clusterId); } finally { - lock.unlock(); + wlock.unlock(); } } public String getClusterNameByClusterId(final String clusterId) { + rlock.lock(); + try { + return getClusterNameByClusterIdNoLock(clusterId); + } finally { + rlock.unlock(); + } + } + + public String getClusterNameByClusterIdNoLock(final String clusterId) { String clusterName = ""; for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) { if (entry.getValue().equals(clusterId)) { @@ -432,55 +498,64 @@ public class CloudSystemInfoService extends SystemInfoService { } public void dropCluster(final String clusterId, final String clusterName) { - lock.lock(); + wlock.lock(); try { clusterNameToId.remove(clusterName, clusterId); clusterIdToBackend.remove(clusterId); } finally { - lock.unlock(); + wlock.unlock(); } } public List<String> getCloudClusterNames() { - return new ArrayList<>(clusterNameToId.keySet()); + rlock.lock(); + try { + return new ArrayList<>(clusterNameToId.keySet()).stream().filter(c -> !Strings.isNullOrEmpty(c)) + .sorted(Comparator.naturalOrder()).collect(Collectors.toList()); + } finally { + rlock.unlock(); + } } // use cluster $clusterName // return clusterName for userName public String addCloudCluster(final String clusterName, final String userName) throws UserException { - lock.lock(); if ((Strings.isNullOrEmpty(clusterName) && Strings.isNullOrEmpty(userName)) || (!Strings.isNullOrEmpty(clusterName) && !Strings.isNullOrEmpty(userName))) { // clusterName or userName just only need one. - lock.unlock(); LOG.warn("addCloudCluster args err clusterName {}, userName {}", clusterName, userName); return ""; } - // First time this method is called, build cloud cluster map - if (clusterNameToId.isEmpty() || clusterIdToBackend.isEmpty()) { - List<Backend> toAdd = Maps.newHashMap(idToBackendRef) - .values().stream() - .filter(i -> i.getTagMap().containsKey(Tag.CLOUD_CLUSTER_ID)) - .filter(i -> i.getTagMap().containsKey(Tag.CLOUD_CLUSTER_NAME)) - .collect(Collectors.toList()); - // The larger bakendId the later it was added, the order matters - toAdd.sort((x, y) -> (int) (x.getId() - y.getId())); - updateCloudClusterMap(toAdd, new ArrayList<>()); - } String clusterId; - if (Strings.isNullOrEmpty(userName)) { - // use clusterName - LOG.info("try to add a cloud cluster, clusterName={}", clusterName); - clusterId = clusterNameToId.get(clusterName); - clusterId = clusterId == null ? "" : clusterId; - if (clusterIdToBackend.containsKey(clusterId)) { // Cluster already added - lock.unlock(); - LOG.info("cloud cluster already added, clusterName={}, clusterId={}", clusterName, clusterId); - return ""; + wlock.lock(); + try { + // First time this method is called, build cloud cluster map + if (clusterNameToId.isEmpty() || clusterIdToBackend.isEmpty()) { + List<Backend> toAdd = Maps.newHashMap(idToBackendRef) + .values().stream() + .filter(i -> i.getTagMap().containsKey(Tag.CLOUD_CLUSTER_ID)) + .filter(i -> i.getTagMap().containsKey(Tag.CLOUD_CLUSTER_NAME)) + .collect(Collectors.toList()); + // The larger bakendId the later it was added, the order matters + toAdd.sort((x, y) -> (int) (x.getId() - y.getId())); + updateCloudClusterMapNoLock(toAdd, new ArrayList<>()); + } + + if (Strings.isNullOrEmpty(userName)) { + // use clusterName + LOG.info("try to add a cloud cluster, clusterName={}", clusterName); + clusterId = clusterNameToId.get(clusterName); + clusterId = clusterId == null ? "" : clusterId; + if (clusterIdToBackend.containsKey(clusterId)) { // Cluster already added + LOG.info("cloud cluster already added, clusterName={}, clusterId={}", clusterName, clusterId); + return ""; + } } + } finally { + wlock.unlock(); } - lock.unlock(); + LOG.info("begin to get cloud cluster from remote, clusterName={}, userName={}", clusterName, userName); // Get cloud cluster info from resource manager @@ -505,11 +580,11 @@ public class CloudSystemInfoService extends SystemInfoService { String clusterNameMeta = cpb.getClusterName(); // Prepare backends - Map<String, String> newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap(); - newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterNameMeta); - newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId); List<Backend> backends = new ArrayList<>(); for (Cloud.NodeInfoPB node : cpb.getNodesList()) { + Map<String, String> newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap(); + newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterNameMeta); + newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId); Backend b = new Backend(Env.getCurrentEnv().getNextId(), node.getIp(), node.getHeartbeatPort()); b.setTagMap(newTagMap); backends.add(b); @@ -521,19 +596,35 @@ public class CloudSystemInfoService extends SystemInfoService { return clusterNameMeta; } - // Return the ref of concurrentMap clusterIdToBackend - // It should be thread-safe to iterate. - // reference: https://stackoverflow.com/questions/3768554/is-iterating-concurrenthashmap-values-thread-safe public Map<String, List<Backend>> getCloudClusterIdToBackend() { - return clusterIdToBackend; + rlock.lock(); + try { + return new ConcurrentHashMap<>(clusterIdToBackend); + } finally { + rlock.unlock(); + } } public String getCloudClusterIdByName(String clusterName) { - return clusterNameToId.get(clusterName); + rlock.lock(); + try { + return clusterNameToId.get(clusterName); + } finally { + rlock.unlock(); + } } public ImmutableMap<Long, Backend> getCloudIdToBackend(String clusterName) { + rlock.lock(); + try { + return getCloudIdToBackendNoLock(clusterName); + } finally { + rlock.unlock(); + } + } + + public ImmutableMap<Long, Backend> getCloudIdToBackendNoLock(String clusterName) { String clusterId = clusterNameToId.get(clusterName); if (Strings.isNullOrEmpty(clusterId)) { LOG.warn("cant find clusterId, this cluster may be has been dropped, clusterName={}", clusterName); @@ -548,18 +639,13 @@ public class CloudSystemInfoService extends SystemInfoService { } // Return the ref of concurrentMap clusterNameToId - // It should be thread-safe to iterate. - // reference: https://stackoverflow.com/questions/3768554/is-iterating-concurrenthashmap-values-thread-safe public Map<String, String> getCloudClusterNameToId() { - return clusterNameToId; - } - - public Map<String, List<ClusterPB>> getMysqlUserNameToClusterPb() { - return mysqlUserNameToClusterPB; - } - - public void updateMysqlUserNameToClusterPb(Map<String, List<ClusterPB>> m) { - mysqlUserNameToClusterPB = m; + rlock.lock(); + try { + return new ConcurrentHashMap<>(clusterNameToId); + } finally { + rlock.unlock(); + } } public List<Pair<String, Integer>> getCurrentObFrontends() { @@ -724,6 +810,4 @@ public class CloudSystemInfoService extends SystemInfoService { LOG.info("auto start cluster {}, start cost {} ms", clusterName, stopWatch.getTime()); } } - - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index b5076466d2b..646d0235b5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -47,6 +47,7 @@ import org.apache.doris.catalog.TableAttributes; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.catalog.VariantType; +import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; @@ -481,6 +482,9 @@ public class StatisticsUtil { return false; } if (Config.isCloudMode()) { + if (!((CloudSystemInfoService) Env.getCurrentSystemInfo()).availableBackendsExists()) { + return false; + } try (AutoCloseConnectContext r = buildConnectContext()) { r.connectContext.getCloudCluster(); for (OlapTable table : statsTbls) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
