This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 064717f2803 [feature](merge-cloud) Add `CloudClusterChecker` to sync
cluster info… (#30243)
064717f2803 is described below
commit 064717f2803f7448a5eb18ba59b704ebeb8e2890
Author: deardeng <[email protected]>
AuthorDate: Tue Jan 23 23:49:11 2024 +0800
[feature](merge-cloud) Add `CloudClusterChecker` to sync cluster info…
(#30243)
Co-authored-by: Lightman <[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 8 +
.../java/org/apache/doris/alter/SystemHandler.java | 4 +-
.../main/java/org/apache/doris/catalog/Env.java | 39 +-
.../doris/cloud/catalog/CloudClusterChecker.java | 474 +++++++++++++++++++++
.../org/apache/doris/cloud/catalog/CloudEnv.java | 311 ++++++++++++++
.../doris/cloud/system/CloudSystemInfoService.java | 209 +++++++++
.../org/apache/doris/deploy/DeployManager.java | 4 +-
.../doris/httpv2/rest/manager/NodeAction.java | 2 +-
.../java/org/apache/doris/metric/MetricRepo.java | 106 +++++
.../main/java/org/apache/doris/system/Backend.java | 8 +
.../org/apache/doris/system/SystemInfoService.java | 14 +-
11 files changed, 1151 insertions(+), 28 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 46194c484bd..0afede3905a 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2476,6 +2476,14 @@ public class Config extends ConfigBase {
@ConfField
public static boolean enable_cloud_snapshot_version = true;
+ @ConfField
+ public static int cloud_cluster_check_interval_second = 10;
+
+ @ConfField
+ public static String cloud_sql_server_cluster_name =
"RESERVED_CLUSTER_NAME_FOR_SQL_SERVER";
+
+ @ConfField
+ public static String cloud_sql_server_cluster_id =
"RESERVED_CLUSTER_ID_FOR_SQL_SERVER";
//==========================================================================
// end of cloud config
//==========================================================================
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
index 86551ba0735..e503e093787 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
@@ -152,7 +152,7 @@ public class SystemHandler extends AlterHandler {
} else if (alterClause instanceof AddObserverClause) {
AddObserverClause clause = (AddObserverClause) alterClause;
Env.getCurrentEnv().addFrontend(FrontendNodeType.OBSERVER,
clause.getHost(),
- clause.getPort());
+ clause.getPort(), "");
} else if (alterClause instanceof DropObserverClause) {
DropObserverClause clause = (DropObserverClause) alterClause;
Env.getCurrentEnv().dropFrontend(FrontendNodeType.OBSERVER,
clause.getHost(),
@@ -160,7 +160,7 @@ public class SystemHandler extends AlterHandler {
} else if (alterClause instanceof AddFollowerClause) {
AddFollowerClause clause = (AddFollowerClause) alterClause;
Env.getCurrentEnv().addFrontend(FrontendNodeType.FOLLOWER,
clause.getHost(),
- clause.getPort());
+ clause.getPort(), "");
} else if (alterClause instanceof DropFollowerClause) {
DropFollowerClause clause = (DropFollowerClause) alterClause;
Env.getCurrentEnv().dropFrontend(FrontendNodeType.FOLLOWER,
clause.getHost(),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 55eb05ba396..9fc98db5f6b 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -330,7 +330,7 @@ public class Env {
private String metaDir;
private String bdbDir;
- private String imageDir;
+ protected String imageDir;
private MetaContext metaContext;
private long epoch = 0;
@@ -380,8 +380,8 @@ public class Env {
private ColumnIdFlushDaemon columnIdFlusher;
- private boolean isFirstTimeStartUp = false;
- private boolean isElectable;
+ protected boolean isFirstTimeStartUp = false;
+ protected boolean isElectable;
// set to true after finished replay all meta and ready to serve
// set to false when catalog is not ready.
private AtomicBoolean isReady = new AtomicBoolean(false);
@@ -395,8 +395,8 @@ public class Env {
private BlockingQueue<FrontendNodeType> typeTransferQueue;
// node name is used for bdbje NodeName.
- private String nodeName;
- private FrontendNodeType role;
+ protected String nodeName;
+ protected FrontendNodeType role;
private FrontendNodeType feType;
// replica and observer use this value to decide provide read service or
not
private long synchronizedTimeMs;
@@ -405,19 +405,19 @@ public class Env {
private MetaIdGenerator idGenerator = new
MetaIdGenerator(NEXT_ID_INIT_VALUE);
private EditLog editLog;
- private int clusterId;
- private String token;
+ protected int clusterId;
+ protected String token;
// For checkpoint and observer memory replayed marker
private AtomicLong replayedJournalId;
private static Env CHECKPOINT = null;
private static long checkpointThreadId = -1;
private Checkpoint checkpointer;
- private List<HostInfo> helperNodes = Lists.newArrayList();
- private HostInfo selfNode = null;
+ protected List<HostInfo> helperNodes = Lists.newArrayList();
+ protected HostInfo selfNode = null;
// node name -> Frontend
- private ConcurrentHashMap<String, Frontend> frontends;
+ protected ConcurrentHashMap<String, Frontend> frontends;
// removed frontends' name. used for checking if name is duplicated in
bdbje
private ConcurrentLinkedQueue<String> removedFrontends;
@@ -1067,7 +1067,7 @@ public class Env {
this.httpReady.set(httpReady);
}
- private void getClusterIdAndRole() throws IOException {
+ protected void getClusterIdAndRole() throws IOException {
File roleFile = new File(this.imageDir, Storage.ROLE_FILE);
File versionFile = new File(this.imageDir, Storage.VERSION_FILE);
@@ -1278,7 +1278,7 @@ public class Env {
// Get the role info and node name from helper node.
// return false if failed.
- private boolean getFeNodeTypeAndNameFromHelpers() {
+ protected boolean getFeNodeTypeAndNameFromHelpers() {
// we try to get info from helper nodes, once we get the right helper
node,
// other helper nodes will be ignored and removed.
HostInfo rightHelperNode = null;
@@ -1584,7 +1584,7 @@ public class Env {
}
// start all daemon threads only running on Master
- private void startMasterOnlyDaemonThreads() {
+ protected void startMasterOnlyDaemonThreads() {
// start checkpoint thread
checkpointer = new Checkpoint(editLog);
checkpointer.setMetaContext(metaContext);
@@ -1781,7 +1781,7 @@ public class Env {
}
}
- private boolean getVersionFileFromHelper(HostInfo helperNode) throws
IOException {
+ protected boolean getVersionFileFromHelper(HostInfo helperNode) throws
IOException {
try {
String url = "http://" +
NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), Config.http_port)
+ "/version";
@@ -1797,7 +1797,7 @@ public class Env {
return false;
}
- private void getNewImage(HostInfo helperNode) throws IOException {
+ protected void getNewImage(HostInfo helperNode) throws IOException {
long localImageVersion = 0;
Storage storage = new Storage(this.imageDir);
localImageVersion = storage.getLatestImageSeq();
@@ -1829,7 +1829,7 @@ public class Env {
}
}
- private boolean isMyself() {
+ protected boolean isMyself() {
Preconditions.checkNotNull(selfNode);
Preconditions.checkNotNull(helperNodes);
LOG.debug("self: {}. helpers: {}", selfNode, helperNodes);
@@ -2762,7 +2762,7 @@ public class Env {
};
}
- public void addFrontend(FrontendNodeType role, String host, int
editLogPort) throws DdlException {
+ public void addFrontend(FrontendNodeType role, String host, int
editLogPort, String nodeName) throws DdlException {
if (!tryLock(false)) {
throw new DdlException("Failed to acquire env lock. Try again");
}
@@ -2774,7 +2774,10 @@ public class Env {
if (Config.enable_fqdn_mode && StringUtils.isEmpty(host)) {
throw new DdlException("frontend's hostName should not be
empty while enable_fqdn_mode is true");
}
- String nodeName = genFeNodeName(host, editLogPort, false /* new
name style */);
+
+ if (Strings.isNullOrEmpty(nodeName)) {
+ nodeName = genFeNodeName(host, editLogPort, false /* new name
style */);
+ }
if (removedFrontends.contains(nodeName)) {
throw new DdlException("frontend name already exists " +
nodeName + ". Try again");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
new file mode 100644
index 00000000000..527909c7bf0
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -0,0 +1,474 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.proto.Cloud.ClusterPB;
+import org.apache.doris.cloud.proto.Cloud.ClusterStatus;
+import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.ha.FrontendNodeType;
+import org.apache.doris.metric.GaugeMetricImpl;
+import org.apache.doris.metric.Metric.MetricUnit;
+import org.apache.doris.metric.MetricLabel;
+import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.Frontend;
+
+import com.google.common.base.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class CloudClusterChecker extends MasterDaemon {
+ private static final Logger LOG =
LogManager.getLogger(CloudClusterChecker.class);
+
+ public CloudClusterChecker() {
+ super("cloud cluster check",
Config.cloud_cluster_check_interval_second * 1000L);
+ }
+
+ /**
+ * Diff 2 collections of current and the dest.
+ * @param toAdd output param = (expectedState - currentState)
+ * @param toDel output param = (currentState - expectedState)
+ * @param supplierCurrentMapFunc get the current be or fe objects
information map from memory, a lambda function
+ * @param supplierNodeMapFunc get be or fe information map from
meta_service return pb, a lambda function
+ */
+ private <T> void diffNodes(List<T> toAdd, List<T> toDel,
Supplier<Map<String, T>> supplierCurrentMapFunc,
+ Supplier<Map<String, T>> supplierNodeMapFunc) {
+ if (toAdd == null || toDel == null) {
+ return;
+ }
+
+ // TODO(gavin): Consider VPC
+ // vpc:ip:port -> Nodes
+ Map<String, T> currentMap = supplierCurrentMapFunc.get();
+ Map<String, T> nodeMap = supplierNodeMapFunc.get();
+
+ LOG.debug("current Nodes={} expected Nodes={}", currentMap.keySet(),
nodeMap.keySet());
+
+ toDel.addAll(currentMap.keySet().stream().filter(i ->
!nodeMap.containsKey(i))
+ .map(currentMap::get).collect(Collectors.toList()));
+
+ toAdd.addAll(nodeMap.keySet().stream().filter(i ->
!currentMap.containsKey(i))
+ .map(nodeMap::get).collect(Collectors.toList()));
+ }
+
+ private void checkToAddCluster(Map<String, ClusterPB> remoteClusterIdToPB,
Set<String> localClusterIds) {
+ List<String> toAddClusterIds = remoteClusterIdToPB.keySet().stream()
+ .filter(i ->
!localClusterIds.contains(i)).collect(Collectors.toList());
+ toAddClusterIds.forEach(
+ addId -> {
+ LOG.debug("begin to add clusterId: {}", addId);
+ // Attach tag to BEs
+ Map<String, String> newTagMap =
Tag.DEFAULT_BACKEND_TAG.toMap();
+ String clusterName =
remoteClusterIdToPB.get(addId).getClusterName();
+ String clusterId =
remoteClusterIdToPB.get(addId).getClusterId();
+ String publicEndpoint =
remoteClusterIdToPB.get(addId).getPublicEndpoint();
+ String privateEndpoint =
remoteClusterIdToPB.get(addId).getPrivateEndpoint();
+ newTagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ newTagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT,
publicEndpoint);
+ newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT,
privateEndpoint);
+ // For old versions that do no have status field set
+ ClusterStatus clusterStatus =
remoteClusterIdToPB.get(addId).hasClusterStatus()
+ ? remoteClusterIdToPB.get(addId).getClusterStatus() :
ClusterStatus.NORMAL;
+ newTagMap.put(Tag.CLOUD_CLUSTER_STATUS,
String.valueOf(clusterStatus));
+ MetricRepo.registerClusterMetrics(clusterName, clusterId);
+ //toAdd.forEach(i -> i.setTagMap(newTagMap));
+ List<Backend> toAdd = new ArrayList<>();
+ for (Cloud.NodeInfoPB node :
remoteClusterIdToPB.get(addId).getNodesList()) {
+ String addr = Config.enable_fqdn_mode ? node.getHost() :
node.getIp();
+ if (Strings.isNullOrEmpty(addr)) {
+ LOG.warn("cant get valid add from ms {}", node);
+ continue;
+ }
+ Backend b = new Backend(Env.getCurrentEnv().getNextId(),
addr, node.getHeartbeatPort());
+ newTagMap.put(Tag.CLOUD_UNIQUE_ID,
node.getCloudUniqueId());
+ b.setTagMap(newTagMap);
+ toAdd.add(b);
+ }
+ Env.getCurrentSystemInfo().updateCloudBackends(toAdd, new
ArrayList<>());
+ }
+ );
+ }
+
+ private void checkToDelCluster(Map<String, ClusterPB> remoteClusterIdToPB,
Set<String> localClusterIds,
+ Map<String, List<Backend>>
clusterIdToBackend) {
+ List<String> toDelClusterIds = localClusterIds.stream()
+ .filter(i ->
!remoteClusterIdToPB.containsKey(i)).collect(Collectors.toList());
+ // drop be cluster
+ Map<String, List<Backend>> finalClusterIdToBackend =
clusterIdToBackend;
+ toDelClusterIds.forEach(
+ delId -> {
+ LOG.debug("begin to drop clusterId: {}", delId);
+ List<Backend> toDel =
+ new
ArrayList<>(finalClusterIdToBackend.getOrDefault(delId, new ArrayList<>()));
+ Env.getCurrentSystemInfo().updateCloudBackends(new
ArrayList<>(), toDel);
+ // del clusterName
+ String delClusterName =
Env.getCurrentSystemInfo().getClusterNameByClusterId(delId);
+ if (delClusterName.isEmpty()) {
+ LOG.warn("can't get delClusterName, clusterId: {}, plz
check", delId);
+ return;
+ }
+ // del clusterID
+ Env.getCurrentSystemInfo().dropCluster(delId, delClusterName);
+ }
+ );
+ }
+
+ private void updateStatus(List<Backend> currentBes, List<Cloud.NodeInfoPB>
expectedBes) {
+ Map<String, Backend> currentMap = new HashMap<>();
+ for (Backend be : currentBes) {
+ String endpoint = be.getHost() + ":" + be.getHeartbeatPort();
+ currentMap.put(endpoint, be);
+ }
+
+ for (Cloud.NodeInfoPB node : expectedBes) {
+ String addr = Config.enable_fqdn_mode ? node.getHost() :
node.getIp();
+ if (Strings.isNullOrEmpty(addr)) {
+ LOG.warn("cant get valid add from ms {}", node);
+ continue;
+ }
+ String endpoint = addr + ":" + node.getHeartbeatPort();
+ Cloud.NodeStatusPB status = node.getStatus();
+ Backend be = currentMap.get(endpoint);
+
+ if (status == Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONING) {
+ if (!be.isDecommissioned()) {
+ LOG.info("decommissioned backend: {} status: {}", be,
status);
+ // TODO(merge-cloud): add it when has CloudUpgradeMgr.
+ /*
+ try {
+ } catch (AnalysisException e) {
+ LOG.warn("failed to register water shed txn id,
decommission be {}", be.getId(), e);
+ }
+ be.setDecommissioned(true);
+ */
+ }
+ }
+ }
+ }
+
+ private void checkDiffNode(Map<String, ClusterPB> remoteClusterIdToPB,
+ Map<String, List<Backend>> clusterIdToBackend) {
+ for (String cid : clusterIdToBackend.keySet()) {
+ List<Backend> toAdd = new ArrayList<>();
+ List<Backend> toDel = new ArrayList<>();
+ ClusterPB cp = remoteClusterIdToPB.get(cid);
+ if (cp == null) {
+ LOG.warn("can't get cid {} info, and local cluster info {},
remote cluster info {}",
+ cid, clusterIdToBackend, remoteClusterIdToPB);
+ continue;
+ }
+ String newClusterName = cp.getClusterName();
+ List<Backend> currentBes = clusterIdToBackend.getOrDefault(cid,
new ArrayList<>());
+ String currentClusterName =
currentBes.stream().map(Backend::getCloudClusterName).findFirst().orElse("");
+
+ if (!newClusterName.equals(currentClusterName)) {
+ // rename cluster's name
+ LOG.info("cluster_name corresponding to cluster_id has been
changed,"
+ + " cluster_id : {} , current_cluster_name : {},
new_cluster_name :{}",
+ cid, currentClusterName, newClusterName);
+ // change all be's cluster_name
+ currentBes.forEach(b -> b.setCloudClusterName(newClusterName));
+ // update clusterNameToId
+
Env.getCurrentSystemInfo().updateClusterNameToId(newClusterName,
currentClusterName, cid);
+ // update tags
+ currentBes.forEach(b ->
Env.getCurrentEnv().getEditLog().logModifyBackend(b));
+ }
+
+ String currentClusterStatus =
Env.getCurrentSystemInfo().getCloudStatusById(cid);
+
+ // For old versions that do no have status field set
+ ClusterStatus clusterStatus = cp.hasClusterStatus() ?
cp.getClusterStatus() : ClusterStatus.NORMAL;
+ String newClusterStatus = String.valueOf(clusterStatus);
+ LOG.debug("current cluster status {} {}", currentClusterStatus,
newClusterStatus);
+ if (!currentClusterStatus.equals(newClusterStatus)) {
+ // cluster's status changed
+ LOG.info("cluster_status corresponding to cluster_id has been
changed,"
+ + " cluster_id : {} , current_cluster_status : {},
new_cluster_status :{}",
+ cid, currentClusterStatus, newClusterStatus);
+ // change all be's cluster_status
+ currentBes.forEach(b ->
b.setCloudClusterStatus(newClusterStatus));
+ // update tags
+ currentBes.forEach(b ->
Env.getCurrentEnv().getEditLog().logModifyBackend(b));
+ }
+
+ List<String> currentBeEndpoints = currentBes.stream().map(backend
->
+ backend.getHost() + ":" +
backend.getHeartbeatPort()).collect(Collectors.toList());
+ List<Cloud.NodeInfoPB> expectedBes =
remoteClusterIdToPB.get(cid).getNodesList();
+ List<String> remoteBeEndpoints = expectedBes.stream()
+ .map(pb -> {
+ String addr = Config.enable_fqdn_mode ? pb.getHost() :
pb.getIp();
+ if (Strings.isNullOrEmpty(addr)) {
+ LOG.warn("cant get valid add from ms {}", pb);
+ return "";
+ }
+ return addr + ":" + pb.getHeartbeatPort();
+ }).filter(e -> !Strings.isNullOrEmpty(e))
+ .collect(Collectors.toList());
+ LOG.info("get cloud cluster, clusterId={} local nodes={} remote
nodes={}", cid,
+ currentBeEndpoints, remoteBeEndpoints);
+
+ updateStatus(currentBes, expectedBes);
+
+ // Attach tag to BEs
+ Map<String, String> newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap();
+ newTagMap.put(Tag.CLOUD_CLUSTER_NAME,
remoteClusterIdToPB.get(cid).getClusterName());
+ newTagMap.put(Tag.CLOUD_CLUSTER_ID,
remoteClusterIdToPB.get(cid).getClusterId());
+ newTagMap.put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT,
remoteClusterIdToPB.get(cid).getPublicEndpoint());
+ newTagMap.put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT,
remoteClusterIdToPB.get(cid).getPrivateEndpoint());
+
+ diffNodes(toAdd, toDel, () -> {
+ Map<String, Backend> currentMap = new HashMap<>();
+ for (Backend be : currentBes) {
+ String endpoint = be.getHost() + ":" +
be.getHeartbeatPort()
+ + be.getCloudPublicEndpoint() +
be.getCloudPrivateEndpoint();
+ currentMap.put(endpoint, be);
+ }
+ return currentMap;
+ }, () -> {
+ Map<String, Backend> nodeMap = new HashMap<>();
+ for (Cloud.NodeInfoPB node : expectedBes) {
+ String host = Config.enable_fqdn_mode ? node.getHost() :
node.getIp();
+ if (Strings.isNullOrEmpty(host)) {
+ LOG.warn("cant get valid add from ms {}", node);
+ continue;
+ }
+ String endpoint = host + ":" + node.getHeartbeatPort()
+ + remoteClusterIdToPB.get(cid).getPublicEndpoint()
+ +
remoteClusterIdToPB.get(cid).getPrivateEndpoint();
+ Backend b = new Backend(Env.getCurrentEnv().getNextId(),
host, node.getHeartbeatPort());
+ if (node.hasIsSmoothUpgrade()) {
+ b.setSmoothUpgradeDst(node.getIsSmoothUpgrade());
+ }
+ newTagMap.put(Tag.CLOUD_UNIQUE_ID,
node.getCloudUniqueId());
+ b.setTagMap(newTagMap);
+ nodeMap.put(endpoint, b);
+ }
+ return nodeMap;
+ });
+
+ LOG.debug("cluster_id: {}, diffBackends nodes: {}, current: {},
toAdd: {}, toDel: {}",
+ cid, expectedBes, currentBes, toAdd, toDel);
+ if (toAdd.isEmpty() && toDel.isEmpty()) {
+ LOG.debug("runAfterCatalogReady nothing todo");
+ continue;
+ }
+
+ Env.getCurrentSystemInfo().updateCloudBackends(toAdd, toDel);
+ }
+ }
+
+ @Override
+ protected void runAfterCatalogReady() {
+ Map<String, List<Backend>> clusterIdToBackend =
Env.getCurrentSystemInfo().getCloudClusterIdToBackend();
+ //rpc to ms, to get mysql user can use cluster_id
+ // NOTE: rpc args all empty, use cluster_unique_id to get a instance's
all cluster info.
+ Cloud.GetClusterResponse response =
CloudSystemInfoService.getCloudCluster("", "", "");
+ if (!response.hasStatus() || !response.getStatus().hasCode()
+ || (response.getStatus().getCode() != Cloud.MetaServiceCode.OK
+ && response.getStatus().getCode() !=
MetaServiceCode.CLUSTER_NOT_FOUND)) {
+ LOG.warn("failed to get cloud cluster due to incomplete response, "
+ + "cloud_unique_id={}, response={}",
Config.cloud_unique_id, response);
+ } else {
+ // clusterId -> clusterPB
+ Map<String, ClusterPB> remoteClusterIdToPB = new HashMap<>();
+ Set<String> localClusterIds = clusterIdToBackend.keySet();
+
+ try {
+ // cluster_ids diff remote <clusterId, nodes> and local
<clusterId, nodes>
+ // remote - local > 0, add bes to local
+ checkToAddCluster(remoteClusterIdToPB, localClusterIds);
+
+ // local - remote > 0, drop bes from local
+ checkToDelCluster(remoteClusterIdToPB, localClusterIds,
clusterIdToBackend);
+
+ if (remoteClusterIdToPB.keySet().size() !=
clusterIdToBackend.keySet().size()) {
+ LOG.warn("impossible cluster id size not match, check it
local {}, remote {}",
+ clusterIdToBackend, remoteClusterIdToPB);
+ }
+ // clusterID local == remote, diff nodes
+ checkDiffNode(remoteClusterIdToPB, clusterIdToBackend);
+
+ // check mem map
+ checkFeNodesMapValid();
+ } catch (Exception e) {
+ LOG.warn("diff cluster has exception, {}", e.getMessage(), e);
+ }
+ }
+
+ // Metric
+ clusterIdToBackend =
Env.getCurrentSystemInfo().getCloudClusterIdToBackend();
+ Map<String, String> clusterNameToId =
Env.getCurrentSystemInfo().getCloudClusterNameToId();
+ for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) {
+ long aliveNum = 0L;
+ List<Backend> bes = clusterIdToBackend.get(entry.getValue());
+ if (bes == null || bes.size() == 0) {
+ LOG.info("cant get be nodes by cluster {}, bes {}", entry,
bes);
+ continue;
+ }
+ for (Backend backend : bes) {
+
MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE.computeIfAbsent(backend.getAddress(),
key -> {
+ GaugeMetricImpl<Integer> backendAlive = new
GaugeMetricImpl<>("backend_alive", MetricUnit.NOUNIT,
+ "backend alive or not");
+ backendAlive.addLabel(new MetricLabel("cluster_id",
entry.getValue()));
+ backendAlive.addLabel(new MetricLabel("cluster_name",
entry.getKey()));
+ backendAlive.addLabel(new MetricLabel("address", key));
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAlive);
+ return backendAlive;
+ }).setValue(backend.isAlive() ? 1 : 0);
+ aliveNum = backend.isAlive() ? aliveNum + 1 : aliveNum;
+ }
+
+
MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE_TOTAL.computeIfAbsent(entry.getKey(),
key -> {
+ GaugeMetricImpl<Long> backendAliveTotal = new
GaugeMetricImpl<>("backend_alive_total",
+ MetricUnit.NOUNIT, "backend alive num in cluster");
+ backendAliveTotal.addLabel(new MetricLabel("cluster_id",
entry.getValue()));
+ backendAliveTotal.addLabel(new MetricLabel("cluster_name",
key));
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAliveTotal);
+ return backendAliveTotal;
+ }).setValue(aliveNum);
+ }
+
+ LOG.info("daemon cluster get cluster info succ, current
cloudClusterIdToBackendMap: {}",
+ Env.getCurrentSystemInfo().getCloudClusterIdToBackend());
+ getObserverFes();
+ }
+
+ private void checkFeNodesMapValid() {
+ LOG.debug("begin checkFeNodesMapValid");
+ Map<String, List<Backend>> clusterIdToBackend =
Env.getCurrentSystemInfo().getCloudClusterIdToBackend();
+ Set<String> clusterIds = new HashSet<>();
+ Set<String> clusterNames = new HashSet<>();
+ clusterIdToBackend.forEach((clusterId, bes) -> {
+ if (bes.isEmpty()) {
+ LOG.warn("impossible, somewhere err, clusterId {},
clusterIdToBeMap {}", clusterId, clusterIdToBackend);
+ clusterIdToBackend.remove(clusterId);
+ }
+ bes.forEach(be -> {
+ clusterIds.add(be.getCloudClusterId());
+ clusterNames.add(be.getCloudClusterName());
+ });
+ });
+
+ Map<String, String> nameToId =
Env.getCurrentSystemInfo().getCloudClusterNameToId();
+ nameToId.forEach((clusterName, clusterId) -> {
+ if (!clusterIdToBackend.containsKey(clusterId)) {
+ LOG.warn("impossible, somewhere err, clusterId {}, clusterName
{}, clusterNameToIdMap {}",
+ clusterId, clusterName, nameToId);
+ nameToId.remove(clusterName);
+ }
+ });
+
+ if (!clusterNames.containsAll(nameToId.keySet()) ||
!nameToId.keySet().containsAll(clusterNames)) {
+ LOG.warn("impossible, somewhere err, clusterNames {}, nameToId
{}", clusterNames, nameToId);
+ }
+ if (!clusterIds.containsAll(nameToId.values()) ||
!nameToId.values().containsAll(clusterIds)) {
+ LOG.warn("impossible, somewhere err, clusterIds {}, nameToId {}",
clusterIds, nameToId);
+ }
+ if (!clusterIds.containsAll(clusterIdToBackend.keySet())
+ || !clusterIdToBackend.keySet().containsAll(clusterIds)) {
+ LOG.warn("impossible, somewhere err, clusterIds {},
clusterIdToBackend {}",
+ clusterIds, clusterIdToBackend);
+ }
+ }
+
+ private void getObserverFes() {
+ Cloud.GetClusterResponse response = CloudSystemInfoService
+ .getCloudCluster(Config.cloud_sql_server_cluster_name,
Config.cloud_sql_server_cluster_id, "");
+ if (!response.hasStatus() || !response.getStatus().hasCode()
+ || response.getStatus().getCode() != Cloud.MetaServiceCode.OK)
{
+ LOG.warn("failed to get cloud cluster due to incomplete response, "
+ + "cloud_unique_id={}, clusterId={}, response={}",
+ Config.cloud_unique_id,
Config.cloud_sql_server_cluster_id, response);
+ return;
+ }
+ // Note: get_cluster interface cluster(option -> repeated), so it has
at least one cluster.
+ if (response.getClusterCount() == 0) {
+ LOG.warn("meta service error , return cluster zero, plz check it, "
+ + "cloud_unique_id={}, clusterId={}, response={}",
+ Config.cloud_unique_id,
Config.cloud_sql_server_cluster_id, response);
+ return;
+ }
+
+ ClusterPB cpb = response.getCluster(0);
+ LOG.debug("get cloud cluster, clusterId={} nodes={}",
Config.cloud_sql_server_cluster_id, cpb.getNodesList());
+ List<Frontend> currentFes =
Env.getCurrentEnv().getFrontends(FrontendNodeType.OBSERVER);
+ List<Frontend> toAdd = new ArrayList<>();
+ List<Frontend> toDel = new ArrayList<>();
+ List<Cloud.NodeInfoPB> expectedFes = cpb.getNodesList();
+ diffNodes(toAdd, toDel, () -> {
+ Map<String, Frontend> currentMap = new HashMap<>();
+ String selfNode = Env.getCurrentEnv().getSelfNode().getIdent();
+ for (Frontend fe : currentFes) {
+ String endpoint = fe.getHost() + "_" + fe.getEditLogPort();
+ if (selfNode.equals(endpoint)) {
+ continue;
+ }
+ currentMap.put(endpoint, fe);
+ }
+ return currentMap;
+ }, () -> {
+ Map<String, Frontend> nodeMap = new HashMap<>();
+ String selfNode = Env.getCurrentEnv().getSelfNode().getIdent();
+ for (Cloud.NodeInfoPB node : expectedFes) {
+ String host = Config.enable_fqdn_mode ? node.getHost() :
node.getIp();
+ if (Strings.isNullOrEmpty(host)) {
+ LOG.warn("cant get valid add from ms {}", node);
+ continue;
+ }
+ String endpoint = host + "_" + node.getEditLogPort();
+ if (selfNode.equals(endpoint)) {
+ continue;
+ }
+ Frontend fe = new Frontend(FrontendNodeType.OBSERVER,
+ CloudEnv.genFeNodeNameFromMeta(host,
node.getEditLogPort(),
+ node.getCtime() * 1000), host, node.getEditLogPort());
+ nodeMap.put(endpoint, fe);
+ }
+ return nodeMap;
+ });
+ LOG.info("diffFrontends nodes: {}, current: {}, toAdd: {}, toDel: {}",
+ expectedFes, currentFes, toAdd, toDel);
+ if (toAdd.isEmpty() && toDel.isEmpty()) {
+ LOG.debug("runAfterCatalogReady getObserverFes nothing todo");
+ return;
+ }
+ try {
+ CloudSystemInfoService.updateFrontends(toAdd, toDel);
+ } catch (DdlException e) {
+ LOG.warn("update cloud frontends exception e: {}, msg: {}", e,
e.getMessage());
+ }
+ }
+}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index d6e6b51f2b2..888eee5a0c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -18,20 +18,331 @@
package org.apache.doris.cloud.catalog;
import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.proto.Cloud.NodeInfoPB;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.Config;
import org.apache.doris.common.io.CountingDataOutputStream;
+import org.apache.doris.common.util.HttpURLUtil;
+import org.apache.doris.common.util.NetUtils;
+import org.apache.doris.ha.FrontendNodeType;
+import org.apache.doris.httpv2.meta.MetaBaseAction;
+import org.apache.doris.persist.Storage;
+import org.apache.doris.system.Frontend;
+import org.apache.doris.system.SystemInfoService.HostInfo;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInputStream;
+import java.io.File;
import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
public class CloudEnv extends Env {
private static final Logger LOG = LogManager.getLogger(CloudEnv.class);
+ private CloudClusterChecker cloudClusterCheck;
+
public CloudEnv(boolean isCheckpointCatalog) {
super(isCheckpointCatalog);
+ this.cloudClusterCheck = new CloudClusterChecker();
+ }
+
+ protected void startMasterOnlyDaemonThreads() {
+ LOG.info("start cloud Master only daemon threads");
+ super.startMasterOnlyDaemonThreads();
+ cloudClusterCheck.start();
+ }
+
+ public static String genFeNodeNameFromMeta(String host, int port, long
timeMs) {
+ return host + "_" + port + "_" + timeMs;
+ }
+
+ private Cloud.NodeInfoPB getLocalTypeFromMetaService() {
+ // get helperNodes from ms
+ Cloud.GetClusterResponse response =
CloudSystemInfoService.getCloudCluster(
+ Config.cloud_sql_server_cluster_name,
Config.cloud_sql_server_cluster_id, "");
+ if (!response.hasStatus() || !response.getStatus().hasCode()
+ || response.getStatus().getCode() != Cloud.MetaServiceCode.OK)
{
+ LOG.warn("failed to get cloud cluster due to incomplete response, "
+ + "cloud_unique_id={}, clusterId={}, response={}",
+ Config.cloud_unique_id,
Config.cloud_sql_server_cluster_id, response);
+ return null;
+ }
+ LOG.info("get cluster response from meta service {}", response);
+ // Note: get_cluster interface cluster(option -> repeated), so it has
at least one cluster.
+ if (response.getClusterCount() == 0) {
+ LOG.warn("meta service error , return cluster zero, plz check it, "
+ + "cloud_unique_id={}, clusterId={}, response={}",
+ Config.cloud_unique_id,
Config.cloud_sql_server_cluster_id, response);
+ return null;
+ }
+ List<Cloud.NodeInfoPB> allNodes = response.getCluster(0).getNodesList()
+
.stream().filter(NodeInfoPB::hasNodeType).collect(Collectors.toList());
+
+ helperNodes.clear();
+ helperNodes.addAll(allNodes.stream()
+ .filter(nodeInfoPB -> nodeInfoPB.getNodeType() ==
NodeInfoPB.NodeType.FE_MASTER)
+ .map(nodeInfoPB -> new HostInfo(
+ Config.enable_fqdn_mode ? nodeInfoPB.getHost() :
nodeInfoPB.getIp(), nodeInfoPB.getEditLogPort()))
+ .collect(Collectors.toList()));
+ // check only have one master node.
+ Preconditions.checkState(helperNodes.size() == 1);
+
+ Optional<NodeInfoPB> local = allNodes.stream().filter(n ->
((Config.enable_fqdn_mode ? n.getHost() : n.getIp())
+ + "_" +
n.getEditLogPort()).equals(selfNode.getIdent())).findAny();
+ return local.orElse(null);
+ }
+
+
+ protected void getClusterIdAndRole() throws IOException {
+ NodeInfoPB.NodeType type = NodeInfoPB.NodeType.UNKNOWN;
+ String feNodeNameFromMeta = "";
+ // cloud mode
+ while (true) {
+ Cloud.NodeInfoPB nodeInfoPB = null;
+ try {
+ nodeInfoPB = getLocalTypeFromMetaService();
+ } catch (Exception e) {
+ LOG.warn("failed to get local fe's type, sleep 5 s, try again.
exception: {}", e.getMessage());
+ }
+ if (nodeInfoPB == null) {
+ LOG.warn("failed to get local fe's type, sleep 5 s, try
again.");
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ LOG.warn("thread sleep Exception", e);
+ }
+ continue;
+ }
+ type = nodeInfoPB.getNodeType();
+ feNodeNameFromMeta = genFeNodeNameFromMeta(
+ Config.enable_fqdn_mode ? nodeInfoPB.getHost() :
nodeInfoPB.getIp(),
+ nodeInfoPB.getEditLogPort(), nodeInfoPB.getCtime() * 1000);
+ break;
+ }
+
+ LOG.info("current fe's role is {}", type ==
NodeInfoPB.NodeType.FE_MASTER ? "MASTER" :
+ type == NodeInfoPB.NodeType.FE_OBSERVER ? "OBSERVER" :
"UNKNOWN");
+ if (type == NodeInfoPB.NodeType.UNKNOWN) {
+ LOG.warn("type current not support, please check it");
+ System.exit(-1);
+ }
+ File roleFile = new File(super.imageDir, Storage.ROLE_FILE);
+ File versionFile = new File(super.imageDir, Storage.VERSION_FILE);
+
+ // if helper node is point to self, or there is ROLE and VERSION file
in local.
+ // get the node type from local
+ if ((type == NodeInfoPB.NodeType.FE_MASTER) || type !=
NodeInfoPB.NodeType.FE_OBSERVER
+ && (isMyself() || (roleFile.exists() &&
versionFile.exists()))) {
+ if (!isMyself()) {
+ LOG.info("find ROLE and VERSION file in local, ignore helper
nodes: {}", helperNodes);
+ }
+
+ // check file integrity, if has.
+ if ((roleFile.exists() && !versionFile.exists()) ||
(!roleFile.exists() && versionFile.exists())) {
+ throw new IOException("role file and version file must both
exist or both not exist. "
+ + "please specific one helper node to recover. will
exit.");
+ }
+
+ // ATTN:
+ // If the version file and role file does not exist and the helper
node is itself,
+ // this should be the very beginning startup of the cluster, so we
create ROLE and VERSION file,
+ // set isFirstTimeStartUp to true, and add itself to frontends
list.
+ // If ROLE and VERSION file is deleted for some reason, we may
arbitrarily start this node as
+ // FOLLOWER, which may cause UNDEFINED behavior.
+ // Everything may be OK if the origin role is exactly FOLLOWER,
+ // but if not, FE process will exit somehow.
+ Storage storage = new Storage(this.imageDir);
+ if (!roleFile.exists()) {
+ // The very first time to start the first node of the cluster.
+ // It should became a Master node (Master node's role is also
FOLLOWER, which means electable)
+
+ // For compatibility. Because this is the very first time to
start, so we arbitrarily choose
+ // a new name for this node
+ role = FrontendNodeType.FOLLOWER;
+ if (type == NodeInfoPB.NodeType.FE_MASTER) {
+ nodeName = feNodeNameFromMeta;
+ } else {
+ nodeName = genFeNodeName(selfNode.getHost(),
selfNode.getPort(), false /* new style */);
+ }
+
+ storage.writeFrontendRoleAndNodeName(role, nodeName);
+ LOG.info("very first time to start this node. role: {}, node
name: {}", role.name(), nodeName);
+ } else {
+ role = storage.getRole();
+ if (role == FrontendNodeType.REPLICA) {
+ // for compatibility
+ role = FrontendNodeType.FOLLOWER;
+ }
+
+ nodeName = storage.getNodeName();
+ if (Strings.isNullOrEmpty(nodeName)) {
+ // In normal case, if ROLE file exist, role and nodeName
should both exist.
+ // But we will get a empty nodeName after upgrading.
+ // So for forward compatibility, we use the "old-style"
way of naming: "ip_port",
+ // and update the ROLE file.
+ if (type == NodeInfoPB.NodeType.FE_MASTER) {
+ nodeName = feNodeNameFromMeta;
+ } else {
+ nodeName = genFeNodeName(selfNode.getHost(),
selfNode.getPort(), true /* old style */);
+ }
+ storage.writeFrontendRoleAndNodeName(role, nodeName);
+ LOG.info("forward compatibility. role: {}, node name: {}",
role.name(), nodeName);
+ }
+ // Notice:
+ // With the introduction of FQDN, the nodeName is no longer
bound to an IP address,
+ // so consistency is no longer checked here. Otherwise, the
startup will fail.
+ }
+
+ Preconditions.checkNotNull(role);
+ Preconditions.checkNotNull(nodeName);
+
+ if (!versionFile.exists()) {
+ clusterId = Config.cluster_id == -1 ? Storage.newClusterID() :
Config.cluster_id;
+ token = Strings.isNullOrEmpty(Config.auth_token) ?
Storage.newToken() : Config.auth_token;
+ storage = new Storage(clusterId, token, this.imageDir);
+ storage.writeClusterIdAndToken();
+
+ isFirstTimeStartUp = true;
+ Frontend self = new Frontend(role, nodeName,
selfNode.getHost(), selfNode.getPort());
+ // Set self alive to true, the
BDBEnvironment.getReplicationGroupAdmin() will rely on this to get
+ // helper node, before the heartbeat thread is started.
+ self.setIsAlive(true);
+ // We don't need to check if frontends already contains self.
+ // frontends must be empty cause no image is loaded and no
journal is replayed yet.
+ // And this frontend will be persisted later after opening
bdbje environment.
+ frontends.put(nodeName, self);
+ LOG.info("add self frontend: {}", self);
+ } else {
+ clusterId = storage.getClusterID();
+ if (storage.getToken() == null) {
+ token = Strings.isNullOrEmpty(Config.auth_token) ?
Storage.newToken() : Config.auth_token;
+ LOG.info("new token={}", token);
+ storage.setToken(token);
+ storage.writeClusterIdAndToken();
+ } else {
+ token = storage.getToken();
+ }
+ isFirstTimeStartUp = false;
+ }
+ } else {
+ // cloud mode, type == NodeInfoPB.NodeType.FE_OBSERVER
+ // try to get role and node name from helper node,
+ // this loop will not end until we get certain role type and name
+ while (true) {
+ if (!getFeNodeTypeAndNameFromHelpers()) {
+ LOG.warn("current node is not added to the group. please
add it first. "
+ + "sleep 5 seconds and retry, current helper
nodes: {}", helperNodes);
+ try {
+ Thread.sleep(5000);
+ continue;
+ } catch (InterruptedException e) {
+ LOG.warn("", e);
+ System.exit(-1);
+ }
+ }
+
+ if (role == FrontendNodeType.REPLICA) {
+ // for compatibility
+ role = FrontendNodeType.FOLLOWER;
+ }
+ break;
+ }
+
+ Preconditions.checkState(helperNodes.size() == 1);
+ Preconditions.checkNotNull(role);
+ Preconditions.checkNotNull(nodeName);
+
+ HostInfo rightHelperNode = helperNodes.get(0);
+
+ Storage storage = new Storage(this.imageDir);
+ if (roleFile.exists() && (role != storage.getRole() ||
!nodeName.equals(storage.getNodeName()))
+ || !roleFile.exists()) {
+ storage.writeFrontendRoleAndNodeName(role, nodeName);
+ }
+ if (!versionFile.exists()) {
+ // If the version file doesn't exist, download it from helper
node
+ if (!getVersionFileFromHelper(rightHelperNode)) {
+ throw new IOException("fail to download version file from "
+ + rightHelperNode.getHost() + " will exit.");
+ }
+
+ // NOTE: cluster_id will be init when Storage object is
constructed,
+ // so we new one.
+ storage = new Storage(this.imageDir);
+ clusterId = storage.getClusterID();
+ token = storage.getToken();
+ if (Strings.isNullOrEmpty(token)) {
+ token = Config.auth_token;
+ }
+ LOG.info("get version file from helper, cluster id {}, token
{}", clusterId, token);
+ } else {
+ // If the version file exist, read the cluster id and check the
+ // id with helper node to make sure they are identical
+ clusterId = storage.getClusterID();
+ token = storage.getToken();
+ LOG.info("check local cluster id {} and token via helper
node", clusterId, token);
+ try {
+ String url = "http://" + NetUtils
+
.getHostPortInAccessibleFormat(rightHelperNode.getHost(), Config.http_port) +
"/check";
+ HttpURLConnection conn =
HttpURLUtil.getConnectionWithNodeIdent(url);
+ conn.setConnectTimeout(2 * 1000);
+ conn.setReadTimeout(2 * 1000);
+ String clusterIdString =
conn.getHeaderField(MetaBaseAction.CLUSTER_ID);
+ int remoteClusterId = Integer.parseInt(clusterIdString);
+ if (remoteClusterId != clusterId) {
+ LOG.error("cluster id is not equal with helper node
{}. will exit. remote:{}, local:{}",
+ rightHelperNode.getHost(), clusterIdString,
clusterId);
+ throw new IOException(
+ "cluster id is not equal with helper node "
+ + rightHelperNode.getHost() + ". will exit.");
+ }
+ String remoteToken =
conn.getHeaderField(MetaBaseAction.TOKEN);
+ if (token == null && remoteToken != null) {
+ LOG.info("get token from helper node. token={}.",
remoteToken);
+ token = remoteToken;
+ storage.writeClusterIdAndToken();
+ storage.reload();
+ }
+ if (Config.enable_token_check) {
+ Preconditions.checkNotNull(token);
+ Preconditions.checkNotNull(remoteToken);
+ if (!token.equals(remoteToken)) {
+ throw new IOException(
+ "token is not equal with helper node " +
rightHelperNode.getHost()
+ + ", local token " + token + ", remote
token " + remoteToken + ". will exit.");
+ }
+ }
+ } catch (Exception e) {
+ throw new IOException("fail to check cluster_id and token
with helper node.", e);
+ }
+ }
+
+ getNewImage(rightHelperNode);
+ }
+
+ if (Config.cluster_id != -1 && clusterId != Config.cluster_id) {
+ throw new IOException("cluster id is not equal with config item
cluster_id. will exit. "
+ + "If you are in recovery mode, please also modify the
cluster_id in 'doris-meta/image/VERSION'");
+ }
+
+ if (role.equals(FrontendNodeType.FOLLOWER)) {
+ isElectable = true;
+ } else {
+ isElectable = false;
+ }
+
+ Preconditions.checkState(helperNodes.size() == 1);
+ LOG.info("finished to get cluster id: {}, isElectable: {}, role: {},
node name: {}, token: {}",
+ clusterId, isElectable, role.name(), nodeName, token);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 5ae16c9f35b..681f7e195e6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -17,18 +17,36 @@
package org.apache.doris.cloud.system;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.rpc.MetaServiceProxy;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
+import org.apache.doris.ha.FrontendNodeType;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.resource.Tag;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
public class CloudSystemInfoService extends SystemInfoService {
+ private static final Logger LOG =
LogManager.getLogger(CloudSystemInfoService.class);
@Override
public Map<Tag, List<Long>> selectBackendIdsForReplicaCreation(
@@ -39,5 +57,196 @@ public class CloudSystemInfoService extends
SystemInfoService {
return Maps.newHashMap();
}
+ /**
+ * Gets cloud cluster from remote with either clusterId or clusterName
+ *
+ * @param clusterName cluster name
+ * @param clusterId cluster id
+ * @return
+ */
+ public static Cloud.GetClusterResponse getCloudCluster(String clusterName,
String clusterId, String userName) {
+ Cloud.GetClusterRequest.Builder builder =
Cloud.GetClusterRequest.newBuilder();
+ builder.setCloudUniqueId(Config.cloud_unique_id)
+
.setClusterName(clusterName).setClusterId(clusterId).setMysqlUserName(userName);
+ final Cloud.GetClusterRequest pRequest = builder.build();
+ Cloud.GetClusterResponse response;
+ try {
+ response = MetaServiceProxy.getInstance().getCluster(pRequest);
+ return response;
+ } catch (RpcException e) {
+ LOG.warn("rpcToMetaGetClusterInfo exception: {}", e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ public synchronized void updateCloudBackends(List<Backend> toAdd,
List<Backend> toDel) {
+ // Deduplicate and validate
+ if (toAdd.size() == 0 && toDel.size() == 0) {
+ LOG.info("nothing to do");
+ return;
+ }
+ Set<String> existedBes = idToBackendRef.entrySet().stream().map(i ->
i.getValue())
+ .map(i -> i.getHost() + ":" + i.getHeartbeatPort())
+ .collect(Collectors.toSet());
+ LOG.debug("deduplication existedBes={}", existedBes);
+ LOG.debug("before deduplication toAdd={} toDel={}", toAdd, toDel);
+ toAdd = toAdd.stream().filter(i -> !existedBes.contains(i.getHost() +
":" + i.getHeartbeatPort()))
+ .collect(Collectors.toList());
+ toDel = toDel.stream().filter(i -> existedBes.contains(i.getHost() +
":" + i.getHeartbeatPort()))
+ .collect(Collectors.toList());
+ LOG.debug("after deduplication toAdd={} toDel={}", toAdd, toDel);
+
+ Map<String, List<Backend>> existedHostToBeList =
idToBackendRef.values().stream().collect(Collectors.groupingBy(
+ Backend::getHost));
+ for (Backend be : toAdd) {
+ Env.getCurrentEnv().getEditLog().logAddBackend(be);
+ LOG.info("added cloud backend={} ", be);
+ // backends is changed, regenerated tablet number metrics
+ MetricRepo.generateBackendsTabletMetrics();
+
+ String host = be.getHost();
+ if (existedHostToBeList.keySet().contains(host)) {
+ if (be.isSmoothUpgradeDst()) {
+ LOG.info("a new BE process will start on the existed node
for smooth upgrading");
+ int beNum = existedHostToBeList.get(host).size();
+ Backend colocatedBe = existedHostToBeList.get(host).get(0);
+ if (beNum != 1) {
+ LOG.warn("find multiple co-located BEs, num: {},
select the 1st {} as migration src", beNum,
+ colocatedBe.getId());
+ }
+ colocatedBe.setSmoothUpgradeSrc(true);
+ handleNewBeOnSameNode(colocatedBe, be);
+ } else {
+ LOG.warn("a new BE process will start on the existed node,
it should not happend unless testing");
+ }
+ }
+ }
+ for (Backend be : toDel) {
+ // drop be, set it not alive
+ be.setAlive(false);
+ be.setLastMissingHeartbeatTime(System.currentTimeMillis());
+ Env.getCurrentEnv().getEditLog().logDropBackend(be);
+ LOG.info("dropped cloud backend={}, and
lastMissingHeartbeatTime={}", be, be.getLastMissingHeartbeatTime());
+ // backends is changed, regenerated tablet number metrics
+ MetricRepo.generateBackendsTabletMetrics();
+ }
+
+ // Update idToBackendRef
+ Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef);
+ toAdd.forEach(i -> copiedBackends.put(i.getId(), i));
+ toDel.forEach(i -> copiedBackends.remove(i.getId()));
+ ImmutableMap<Long, Backend> newIdToBackend =
ImmutableMap.copyOf(copiedBackends);
+ idToBackendRef = newIdToBackend;
+
+ // Update idToReportVersionRef
+ Map<Long, AtomicLong> copiedReportVersions =
Maps.newHashMap(idToReportVersionRef);
+ toAdd.forEach(i -> copiedReportVersions.put(i.getId(), new
AtomicLong(0L)));
+ toDel.forEach(i -> copiedReportVersions.remove(i.getId()));
+ ImmutableMap<Long, AtomicLong> newIdToReportVersion =
ImmutableMap.copyOf(copiedReportVersions);
+ idToReportVersionRef = newIdToReportVersion;
+
+ updateCloudClusterMap(toAdd, toDel);
+ }
+
+ private void handleNewBeOnSameNode(Backend oldBe, Backend newBe) {
+ LOG.info("new BE {} starts on the same node as existing BE {}",
newBe.getId(), oldBe.getId());
+ // TODO(merge-cloud): add it when has CloudTabletRebalancer.
+ //
Env.getCurrentEnv().getCloudTabletRebalancer().addTabletMigrationTask(oldBe.getId(),
newBe.getId());
+ }
+
+ public void updateCloudClusterMap(List<Backend> toAdd, List<Backend>
toDel) {
+ lock.lock();
+ Set<String> clusterNameSet = new HashSet<>();
+ for (Backend b : toAdd) {
+ String clusterName = b.getCloudClusterName();
+ String clusterId = b.getCloudClusterId();
+ if (clusterName.isEmpty() || clusterId.isEmpty()) {
+ LOG.warn("cloud cluster name or id empty: id={}, name={}",
clusterId, clusterName);
+ continue;
+ }
+ clusterNameSet.add(clusterName);
+ if (clusterNameSet.size() != 1) {
+ LOG.warn("toAdd be list have multi clusterName, please check,
Set: {}", clusterNameSet);
+ }
+
+ clusterNameToId.put(clusterName, clusterId);
+ List<Backend> be = clusterIdToBackend.get(clusterId);
+ if (be == null) {
+ be = new ArrayList<>();
+ clusterIdToBackend.put(clusterId, be);
+ MetricRepo.registerClusterMetrics(clusterName, clusterId);
+ }
+ Set<String> existed = be.stream().map(i -> i.getHost() + ":" +
i.getHeartbeatPort())
+ .collect(Collectors.toSet());
+ // Deduplicate
+ // TODO(gavin): consider vpc
+ boolean alreadyExisted = existed.contains(b.getHost() + ":" +
b.getHeartbeatPort());
+ if (alreadyExisted) {
+ LOG.info("BE already existed, clusterName={} clusterId={}
backendNum={} backend={}",
+ clusterName, clusterId, be.size(), b);
+ continue;
+ }
+ be.add(b);
+ LOG.info("update (add) cloud cluster map, clusterName={}
clusterId={} backendNum={} current backend={}",
+ clusterName, clusterId, be.size(), b);
+ }
+
+ for (Backend b : toDel) {
+ String clusterName = b.getCloudClusterName();
+ String clusterId = b.getCloudClusterId();
+ // We actually don't care about cluster name here
+ if (clusterName.isEmpty() || clusterId.isEmpty()) {
+ LOG.warn("cloud cluster name or id empty: id={}, name={}",
clusterId, clusterName);
+ continue;
+ }
+ List<Backend> be = clusterIdToBackend.get(clusterId);
+ if (be == null) {
+ LOG.warn("try to remove a non-existing cluster, clusterId={}
clusterName={}",
+ clusterId, clusterName);
+ continue;
+ }
+ Set<Long> d = toDel.stream().map(i ->
i.getId()).collect(Collectors.toSet());
+ be = be.stream().filter(i ->
!d.contains(i.getId())).collect(Collectors.toList());
+ // ATTN: clusterId may have zero nodes
+ clusterIdToBackend.replace(clusterId, be);
+ // such as dropCluster, but no lock
+ // ATTN: Empty clusters are treated as dropped clusters.
+ if (be.size() == 0) {
+ LOG.info("del clusterId {} and clusterName {} due to be nodes
eq 0", clusterId, clusterName);
+ boolean succ = clusterNameToId.remove(clusterName, clusterId);
+ if (!succ) {
+ LOG.warn("impossible, somewhere err, clusterNameToId {}, "
+ + "want remove cluster name {}, cluster id {}",
clusterNameToId, clusterName, clusterId);
+ }
+ clusterIdToBackend.remove(clusterId);
+ }
+ LOG.info("update (del) cloud cluster map, clusterName={}
clusterId={} backendNum={} current backend={}",
+ clusterName, clusterId, be.size(), b);
+ }
+ lock.unlock();
+ }
+
+
+ public static synchronized void updateFrontends(List<Frontend> toAdd,
+ List<Frontend> toDel) throws
DdlException {
+ LOG.debug("updateCloudFrontends toAdd={} toDel={}", toAdd, toDel);
+ String masterIp = Env.getCurrentEnv().getMasterHost();
+ for (Frontend fe : toAdd) {
+ if (masterIp.equals(fe.getHost())) {
+ continue;
+ }
+ Env.getCurrentEnv().addFrontend(FrontendNodeType.OBSERVER,
+ fe.getHost(), fe.getEditLogPort(), fe.getNodeName());
+ LOG.info("added cloud frontend={} ", fe);
+ }
+ for (Frontend fe : toDel) {
+ if (masterIp.equals(fe.getHost())) {
+ continue;
+ }
+ Env.getCurrentEnv().dropFrontend(FrontendNodeType.OBSERVER,
fe.getHost(), fe.getEditLogPort());
+ LOG.info("dropped cloud frontend={} ", fe);
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java
b/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java
index 79b8e9392d2..6150530c8cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java
@@ -543,10 +543,10 @@ public class DeployManager extends MasterDaemon {
try {
switch (nodeType) {
case ELECTABLE:
- env.addFrontend(FrontendNodeType.FOLLOWER, remoteHost,
remotePort);
+ env.addFrontend(FrontendNodeType.FOLLOWER, remoteHost,
remotePort, "");
break;
case OBSERVER:
- env.addFrontend(FrontendNodeType.OBSERVER, remoteHost,
remotePort);
+ env.addFrontend(FrontendNodeType.OBSERVER, remoteHost,
remotePort, "");
break;
case BACKEND:
case BACKEND_CN:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
index ba444474628..15d8e51e2c7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
@@ -661,7 +661,7 @@ public class NodeAction extends RestBaseController {
}
HostInfo info =
SystemInfoService.getHostAndPort(reqInfo.getHostPort());
if ("ADD".equals(action)) {
- currentEnv.addFrontend(frontendNodeType, info.getHost(),
info.getPort());
+ currentEnv.addFrontend(frontendNodeType, info.getHost(),
info.getPort(), "");
} else if ("DROP".equals(action)) {
currentEnv.dropFrontend(frontendNodeType, info.getHost(),
info.getPort());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index 4ad8fdda671..667290132aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -23,6 +23,7 @@ import org.apache.doris.alter.AlterJobV2.JobType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.load.EtlJobType;
@@ -42,14 +43,17 @@ import org.apache.doris.transaction.TransactionStatus;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -66,6 +70,7 @@ public final class MetricRepo {
public static final String TABLET_NUM = "tablet_num";
public static final String TABLET_MAX_COMPACTION_SCORE =
"tablet_max_compaction_score";
+ public static final String CLOUD_TAG = "cloud";
public static LongCounterMetric COUNTER_REQUEST_ALL;
public static LongCounterMetric COUNTER_QUERY_ALL;
@@ -127,10 +132,111 @@ public final class MetricRepo {
public static GaugeMetricImpl<Double> GAUGE_QUERY_ERR_RATE;
public static GaugeMetricImpl<Long> GAUGE_MAX_TABLET_COMPACTION_SCORE;
+ // cloud metrics
+ public static ConcurrentHashMap<String, LongCounterMetric>
+ CLOUD_CLUSTER_COUNTER_REQUEST_ALL = new ConcurrentHashMap<>();
+ public static ConcurrentHashMap<String, LongCounterMetric>
+ CLOUD_CLUSTER_COUNTER_QUERY_ALL = new ConcurrentHashMap<>();
+ public static ConcurrentHashMap<String, LongCounterMetric>
+ CLOUD_CLUSTER_COUNTER_QUERY_ERR = new ConcurrentHashMap<>();
+
+ public static ConcurrentHashMap<String, GaugeMetricImpl<Double>>
+ CLOUD_CLUSTER_GAUGE_QUERY_PER_SECOND = new ConcurrentHashMap<>();
+
+ public static GaugeMetricImpl<Double>
GAUGE_HTTP_COPY_INTO_UPLOAD_PER_SECOND;
+
+ public static GaugeMetricImpl<Double> GAUGE_HTTP_COPY_INTO_UPLOAD_ERR_RATE;
+
+ public static GaugeMetricImpl<Double>
GAUGE_HTTP_COPY_INTO_QUERY_PER_SECOND;
+
+ public static GaugeMetricImpl<Double> GAUGE_HTTP_COPY_INTO_QUERY_ERR_RATE;
+
+ public static ConcurrentHashMap<String, GaugeMetricImpl<Double>>
+ CLOUD_CLUSTER_GAUGE_REQUEST_PER_SECOND = new ConcurrentHashMap<>();
+ public static ConcurrentHashMap<String, GaugeMetricImpl<Double>>
+ CLOUD_CLUSTER_GAUGE_QUERY_ERR_RATE = new ConcurrentHashMap<>();
+
+ public static ConcurrentHashMap<String, Histogram>
+ CLOUD_CLUSTER_HISTO_QUERY_LATENCY = new ConcurrentHashMap<>();
+
+ public static Map<String, GaugeMetricImpl<Integer>>
+ CLOUD_CLUSTER_BACKEND_ALIVE = new HashMap<>();
+ public static Map<String, GaugeMetricImpl<Long>>
+ CLOUD_CLUSTER_BACKEND_ALIVE_TOTAL = new HashMap<>();
+
+ private static Map<Pair<EtlJobType, JobState>, Long> loadJobNum =
Maps.newHashMap();
+ private static Long lastCalcLoadJobTime = 0L;
+
private static ScheduledThreadPoolExecutor metricTimer =
ThreadPoolManager.newDaemonScheduledThreadPool(1,
"metric-timer-pool", true);
private static MetricCalculator metricCalculator = new MetricCalculator();
+ public static void registerClusterMetrics(String clusterName, String
clusterId) {
+ CLOUD_CLUSTER_COUNTER_REQUEST_ALL.computeIfAbsent(clusterName, key -> {
+ LongCounterMetric counterRequestAll = new
LongCounterMetric("request_total", MetricUnit.REQUESTS,
+ "total request");
+ counterRequestAll.addLabel(new MetricLabel("cluster_id",
clusterId));
+ counterRequestAll.addLabel(new MetricLabel("cluster_name", key));
+ DORIS_METRIC_REGISTER.addMetrics(counterRequestAll);
+ return counterRequestAll;
+ });
+
+
MetricRepo.CLOUD_CLUSTER_COUNTER_QUERY_ALL.computeIfAbsent(clusterName, key -> {
+ LongCounterMetric counterQueryAll = new
LongCounterMetric("query_total", MetricUnit.REQUESTS,
+ "total query");
+ counterQueryAll.addLabel(new MetricLabel("cluster_id", clusterId));
+ counterQueryAll.addLabel(new MetricLabel("cluster_name", key));
+ DORIS_METRIC_REGISTER.addMetrics(counterQueryAll);
+ return counterQueryAll;
+ });
+
+ CLOUD_CLUSTER_COUNTER_QUERY_ERR.computeIfAbsent(clusterName, key -> {
+ LongCounterMetric counterQueryErr = new
LongCounterMetric("query_err", MetricUnit.REQUESTS,
+ "total error query");
+ counterQueryErr.addLabel(new MetricLabel("cluster_id", clusterId));
+ counterQueryErr.addLabel(new MetricLabel("cluster_name", key));
+ DORIS_METRIC_REGISTER.addMetrics(counterQueryErr);
+ return counterQueryErr;
+ });
+
+ CLOUD_CLUSTER_GAUGE_QUERY_PER_SECOND.computeIfAbsent(clusterName, key
-> {
+ GaugeMetricImpl<Double> gaugeQueryPerSecond = new
GaugeMetricImpl<>("qps", MetricUnit.NOUNIT,
+ "query per second");
+ gaugeQueryPerSecond.addLabel(new MetricLabel("cluster_id",
clusterId));
+ gaugeQueryPerSecond.addLabel(new MetricLabel("cluster_name",
clusterName));
+ gaugeQueryPerSecond.setValue(0.0);
+ DORIS_METRIC_REGISTER.addMetrics(gaugeQueryPerSecond);
+ return gaugeQueryPerSecond;
+ }).setValue(0.0);
+
+ CLOUD_CLUSTER_GAUGE_REQUEST_PER_SECOND.computeIfAbsent(clusterName,
key -> {
+ GaugeMetricImpl<Double> gaugeRequestPerSecond = new
GaugeMetricImpl<>("rps", MetricUnit.NOUNIT,
+ "request per second");
+ gaugeRequestPerSecond.addLabel(new MetricLabel("cluster_id",
clusterId));
+ gaugeRequestPerSecond.addLabel(new MetricLabel("cluster_name",
clusterName));
+ gaugeRequestPerSecond.setValue(0.0);
+ DORIS_METRIC_REGISTER.addMetrics(gaugeRequestPerSecond);
+ return gaugeRequestPerSecond;
+ }).setValue(0.0);
+
+ CLOUD_CLUSTER_GAUGE_QUERY_ERR_RATE.computeIfAbsent(clusterName, key ->
{
+ GaugeMetricImpl<Double> gaugeQueryErrRate = new
GaugeMetricImpl<>("query_err_rate",
+ MetricUnit.NOUNIT, "query error rate");
+ gaugeQueryErrRate.addLabel(new MetricLabel("cluster_id",
clusterId));
+ gaugeQueryErrRate.addLabel(new MetricLabel("cluster_name",
clusterName));
+ gaugeQueryErrRate.setValue(0.0);
+ DORIS_METRIC_REGISTER.addMetrics(gaugeQueryErrRate);
+ return gaugeQueryErrRate;
+ }).setValue(0.0);
+
+ CLOUD_CLUSTER_HISTO_QUERY_LATENCY.computeIfAbsent(clusterName, key -> {
+ Histogram histoQueryLatency = MetricRepo.METRIC_REGISTER.histogram(
+ MetricRegistry.name("query", "latency", "ms",
+ MetricRepo.CLOUD_TAG, key));
+ return histoQueryLatency;
+ });
+ }
+
// init() should only be called after catalog is contructed.
public static synchronized void init() {
if (isInit) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index 2f8faa06b1f..f672770c4fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -213,6 +213,10 @@ public class Backend implements Writable {
return id;
}
+ public String getAddress() {
+ return host + ":" + heartbeatPort;
+ }
+
public String getHost() {
return host;
}
@@ -370,6 +374,10 @@ public class Backend implements Writable {
return lastMissingHeartbeatTime;
}
+ public void setLastMissingHeartbeatTime(long lastMissingHeartbeatTime) {
+ this.lastMissingHeartbeatTime = lastMissingHeartbeatTime;
+ }
+
// Backend process epoch, is uesd to tag a beckend process
// Currently it is always equal to be start time, even during oplog replay.
public long getProcessEpoch() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 62511e25e47..d4140c2b01f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -86,19 +86,19 @@ public class SystemInfoService {
public static final String NOT_USING_VALID_CLUSTER_MSG = "Not using valid
cloud clusters, "
+ "please use a cluster before issuing any queries";
- private volatile ImmutableMap<Long, Backend> idToBackendRef =
ImmutableMap.of();
- private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef =
ImmutableMap.of();
+ protected volatile ImmutableMap<Long, Backend> idToBackendRef =
ImmutableMap.of();
+ protected volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef =
ImmutableMap.of();
// TODO(gavin): use {clusterId -> List<BackendId>} instead to reduce risk
of inconsistency
// use exclusive lock to make sure only one thread can change
clusterIdToBackend and clusterNameToId
- private ReentrantLock lock = new ReentrantLock();
+ protected ReentrantLock lock = new ReentrantLock();
// for show cluster and cache user owned cluster
// mysqlUserName -> List of ClusterPB
private Map<String, List<ClusterPB>> mysqlUserNameToClusterPB =
ImmutableMap.of();
// clusterId -> List<Backend>
- private Map<String, List<Backend>> clusterIdToBackend = new
ConcurrentHashMap<>();
+ protected Map<String, List<Backend>> clusterIdToBackend = new
ConcurrentHashMap<>();
// clusterName -> clusterId
- private Map<String, String> clusterNameToId = new ConcurrentHashMap<>();
+ protected Map<String, String> clusterNameToId = new ConcurrentHashMap<>();
private volatile ImmutableMap<Long, DiskInfo> pathHashToDiskInfoRef =
ImmutableMap.of();
@@ -1155,4 +1155,8 @@ public class SystemInfoService {
}
this.instanceStatus = instanceStatus;
}
+
+ public synchronized void updateCloudBackends(List<Backend> toAdd,
List<Backend> toDel) {
+ LOG.warn("Not cloud mode, should not be here");
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]