This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3dade1c03c6 [feature](merge-cloud) Fix cloud cluster checker cant get
backends (#30467)
3dade1c03c6 is described below
commit 3dade1c03c63ae59658aecbc5e42cd8d1d152776
Author: deardeng <[email protected]>
AuthorDate: Mon Jan 29 20:06:58 2024 +0800
[feature](merge-cloud) Fix cloud cluster checker cant get backends (#30467)
---
.../doris/cloud/catalog/CloudClusterChecker.java | 157 +++++++++++----------
1 file changed, 84 insertions(+), 73 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
index 527909c7bf0..5288cd73ea7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -20,6 +20,7 @@ package org.apache.doris.cloud.catalog;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.proto.Cloud.ClusterPB;
+import org.apache.doris.cloud.proto.Cloud.ClusterPB.Type;
import org.apache.doris.cloud.proto.Cloud.ClusterStatus;
import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
import org.apache.doris.cloud.system.CloudSystemInfoService;
@@ -291,78 +292,9 @@ public class CloudClusterChecker extends MasterDaemon {
@Override
protected void runAfterCatalogReady() {
- Map<String, List<Backend>> clusterIdToBackend =
Env.getCurrentSystemInfo().getCloudClusterIdToBackend();
- //rpc to ms, to get mysql user can use cluster_id
- // NOTE: rpc args all empty, use cluster_unique_id to get a instance's
all cluster info.
- Cloud.GetClusterResponse response =
CloudSystemInfoService.getCloudCluster("", "", "");
- if (!response.hasStatus() || !response.getStatus().hasCode()
- || (response.getStatus().getCode() != Cloud.MetaServiceCode.OK
- && response.getStatus().getCode() !=
MetaServiceCode.CLUSTER_NOT_FOUND)) {
- LOG.warn("failed to get cloud cluster due to incomplete response, "
- + "cloud_unique_id={}, response={}",
Config.cloud_unique_id, response);
- } else {
- // clusterId -> clusterPB
- Map<String, ClusterPB> remoteClusterIdToPB = new HashMap<>();
- Set<String> localClusterIds = clusterIdToBackend.keySet();
-
- try {
- // cluster_ids diff remote <clusterId, nodes> and local
<clusterId, nodes>
- // remote - local > 0, add bes to local
- checkToAddCluster(remoteClusterIdToPB, localClusterIds);
-
- // local - remote > 0, drop bes from local
- checkToDelCluster(remoteClusterIdToPB, localClusterIds,
clusterIdToBackend);
-
- if (remoteClusterIdToPB.keySet().size() !=
clusterIdToBackend.keySet().size()) {
- LOG.warn("impossible cluster id size not match, check it
local {}, remote {}",
- clusterIdToBackend, remoteClusterIdToPB);
- }
- // clusterID local == remote, diff nodes
- checkDiffNode(remoteClusterIdToPB, clusterIdToBackend);
-
- // check mem map
- checkFeNodesMapValid();
- } catch (Exception e) {
- LOG.warn("diff cluster has exception, {}", e.getMessage(), e);
- }
- }
-
- // Metric
- clusterIdToBackend =
Env.getCurrentSystemInfo().getCloudClusterIdToBackend();
- Map<String, String> clusterNameToId =
Env.getCurrentSystemInfo().getCloudClusterNameToId();
- for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) {
- long aliveNum = 0L;
- List<Backend> bes = clusterIdToBackend.get(entry.getValue());
- if (bes == null || bes.size() == 0) {
- LOG.info("cant get be nodes by cluster {}, bes {}", entry,
bes);
- continue;
- }
- for (Backend backend : bes) {
-
MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE.computeIfAbsent(backend.getAddress(),
key -> {
- GaugeMetricImpl<Integer> backendAlive = new
GaugeMetricImpl<>("backend_alive", MetricUnit.NOUNIT,
- "backend alive or not");
- backendAlive.addLabel(new MetricLabel("cluster_id",
entry.getValue()));
- backendAlive.addLabel(new MetricLabel("cluster_name",
entry.getKey()));
- backendAlive.addLabel(new MetricLabel("address", key));
- MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAlive);
- return backendAlive;
- }).setValue(backend.isAlive() ? 1 : 0);
- aliveNum = backend.isAlive() ? aliveNum + 1 : aliveNum;
- }
-
-
MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE_TOTAL.computeIfAbsent(entry.getKey(),
key -> {
- GaugeMetricImpl<Long> backendAliveTotal = new
GaugeMetricImpl<>("backend_alive_total",
- MetricUnit.NOUNIT, "backend alive num in cluster");
- backendAliveTotal.addLabel(new MetricLabel("cluster_id",
entry.getValue()));
- backendAliveTotal.addLabel(new MetricLabel("cluster_name",
key));
- MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAliveTotal);
- return backendAliveTotal;
- }).setValue(aliveNum);
- }
-
- LOG.info("daemon cluster get cluster info succ, current
cloudClusterIdToBackendMap: {}",
- Env.getCurrentSystemInfo().getCloudClusterIdToBackend());
- getObserverFes();
+ getCloudBackends();
+ updateCloudMetrics();
+ getCloudObserverFes();
}
private void checkFeNodesMapValid() {
@@ -403,7 +335,7 @@ public class CloudClusterChecker extends MasterDaemon {
}
}
- private void getObserverFes() {
+ private void getCloudObserverFes() {
Cloud.GetClusterResponse response = CloudSystemInfoService
.getCloudCluster(Config.cloud_sql_server_cluster_name,
Config.cloud_sql_server_cluster_id, "");
if (!response.hasStatus() || !response.getStatus().hasCode()
@@ -470,5 +402,84 @@ public class CloudClusterChecker extends MasterDaemon {
LOG.warn("update cloud frontends exception e: {}, msg: {}", e,
e.getMessage());
}
}
+
+ private void getCloudBackends() {
+ Map<String, List<Backend>> clusterIdToBackend =
Env.getCurrentSystemInfo().getCloudClusterIdToBackend();
+ //rpc to ms, to get mysql user can use cluster_id
+ // NOTE: rpc args all empty, use cluster_unique_id to get a instance's
all cluster info.
+ Cloud.GetClusterResponse response =
CloudSystemInfoService.getCloudCluster("", "", "");
+ if (!response.hasStatus() || !response.getStatus().hasCode()
+ || (response.getStatus().getCode() != Cloud.MetaServiceCode.OK
+ && response.getStatus().getCode() !=
MetaServiceCode.CLUSTER_NOT_FOUND)) {
+ LOG.warn("failed to get cloud cluster due to incomplete response, "
+ + "cloud_unique_id={}, response={}",
Config.cloud_unique_id, response);
+ return;
+ }
+ Set<String> localClusterIds = clusterIdToBackend.keySet();
+ // clusterId -> clusterPB
+ Map<String, ClusterPB> remoteClusterIdToPB =
response.getClusterList().stream()
+ .filter(c -> c.getType() != Type.SQL)
+ .collect(Collectors.toMap(ClusterPB::getClusterId, clusterPB
-> clusterPB));
+ LOG.info("get cluster info clusterIds: {}", remoteClusterIdToPB);
+
+ try {
+ // cluster_ids diff remote <clusterId, nodes> and local
<clusterId, nodes>
+ // remote - local > 0, add bes to local
+ checkToAddCluster(remoteClusterIdToPB, localClusterIds);
+
+ // local - remote > 0, drop bes from local
+ checkToDelCluster(remoteClusterIdToPB, localClusterIds,
clusterIdToBackend);
+
+ if (remoteClusterIdToPB.keySet().size() !=
clusterIdToBackend.keySet().size()) {
+ LOG.warn("impossible cluster id size not match, check it local
{}, remote {}",
+ clusterIdToBackend, remoteClusterIdToPB);
+ }
+ // clusterID local == remote, diff nodes
+ checkDiffNode(remoteClusterIdToPB, clusterIdToBackend);
+
+ // check mem map
+ checkFeNodesMapValid();
+ } catch (Exception e) {
+ LOG.warn("diff cluster has exception, {}", e.getMessage(), e);
+
+ }
+ LOG.info("daemon cluster get cluster info succ, current
cloudClusterIdToBackendMap: {}",
+ Env.getCurrentSystemInfo().getCloudClusterIdToBackend());
+ }
+
+ private void updateCloudMetrics() {
+ // Metric
+ Map<String, List<Backend>> clusterIdToBackend =
Env.getCurrentSystemInfo().getCloudClusterIdToBackend();
+ Map<String, String> clusterNameToId =
Env.getCurrentSystemInfo().getCloudClusterNameToId();
+ for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) {
+ long aliveNum = 0L;
+ List<Backend> bes = clusterIdToBackend.get(entry.getValue());
+ if (bes == null || bes.size() == 0) {
+ LOG.info("cant get be nodes by cluster {}, bes {}", entry,
bes);
+ continue;
+ }
+ for (Backend backend : bes) {
+
MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE.computeIfAbsent(backend.getAddress(),
key -> {
+ GaugeMetricImpl<Integer> backendAlive = new
GaugeMetricImpl<>("backend_alive", MetricUnit.NOUNIT,
+ "backend alive or not");
+ backendAlive.addLabel(new MetricLabel("cluster_id",
entry.getValue()));
+ backendAlive.addLabel(new MetricLabel("cluster_name",
entry.getKey()));
+ backendAlive.addLabel(new MetricLabel("address", key));
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAlive);
+ return backendAlive;
+ }).setValue(backend.isAlive() ? 1 : 0);
+ aliveNum = backend.isAlive() ? aliveNum + 1 : aliveNum;
+ }
+
+
MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE_TOTAL.computeIfAbsent(entry.getKey(),
key -> {
+ GaugeMetricImpl<Long> backendAliveTotal = new
GaugeMetricImpl<>("backend_alive_total",
+ MetricUnit.NOUNIT, "backend alive num in cluster");
+ backendAliveTotal.addLabel(new MetricLabel("cluster_id",
entry.getValue()));
+ backendAliveTotal.addLabel(new MetricLabel("cluster_name",
key));
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAliveTotal);
+ return backendAliveTotal;
+ }).setValue(aliveNum);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]