This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7b4ecfb5728 [feature](merge-cloud) Fix cloud mode can't drop backends
and refacto… (#30534)
7b4ecfb5728 is described below
commit 7b4ecfb5728e27300354803ff531756e0af5d17c
Author: deardeng <[email protected]>
AuthorDate: Wed Jan 31 19:53:54 2024 +0800
[feature](merge-cloud) Fix cloud mode can't drop backends and refacto…
(#30534)
---
.../main/java/org/apache/doris/catalog/Env.java | 24 +-
.../doris/cloud/catalog/CloudClusterChecker.java | 33 +-
.../org/apache/doris/cloud/catalog/CloudEnv.java | 23 +-
.../cloud/catalog/CloudInstanceStatusChecker.java | 11 +-
.../apache/doris/cloud/catalog/CloudReplica.java | 21 +-
.../doris/cloud/load/CloudBrokerLoadJob.java | 9 +-
.../doris/cloud/system/CloudSystemInfoService.java | 331 +++++++++++++++++----
.../java/org/apache/doris/qe/ConnectContext.java | 6 +-
.../org/apache/doris/system/SystemInfoService.java | 177 -----------
9 files changed, 336 insertions(+), 299 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 074c5cb7961..5dd1740aeaa 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -69,7 +69,6 @@ import org.apache.doris.analysis.RecoverDbStmt;
import org.apache.doris.analysis.RecoverPartitionStmt;
import org.apache.doris.analysis.RecoverTableStmt;
import org.apache.doris.analysis.ReplacePartitionClause;
-import org.apache.doris.analysis.ResourceTypeEnum;
import org.apache.doris.analysis.RestoreStmt;
import org.apache.doris.analysis.RollupRenameClause;
import org.apache.doris.analysis.SetType;
@@ -425,7 +424,7 @@ public class Env {
private JournalObservable journalObservable;
- private SystemInfoService systemInfo;
+ protected SystemInfoService systemInfo;
private HeartbeatMgr heartbeatMgr;
private TabletInvertedIndex tabletInvertedIndex;
private ColocateTableIndex colocateTableIndex;
@@ -4999,27 +4998,6 @@ public class Env {
this.alter.getClusterHandler().cancel(stmt);
}
- public void checkCloudClusterPriv(String clusterName) throws DdlException {
- // check resource usage privilege
- if
(!Env.getCurrentEnv().getAuth().checkCloudPriv(ConnectContext.get().getCurrentUserIdentity(),
- clusterName, PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER)) {
- throw new DdlException("USAGE denied to user "
- + ConnectContext.get().getQualifiedUser() + "'@'" +
ConnectContext.get().getRemoteIP()
- + "' for cloud cluster '" + clusterName + "'",
ErrorCode.ERR_CLUSTER_NO_PERMISSIONS);
- }
-
- if
(!Env.getCurrentSystemInfo().getCloudClusterNames().contains(clusterName)) {
- LOG.debug("current instance does not have a cluster name :{}",
clusterName);
- throw new DdlException(String.format("Cluster %s not exist",
clusterName),
- ErrorCode.ERR_CLOUD_CLUSTER_ERROR);
- }
- }
-
- public static void waitForAutoStart(final String clusterName) throws
DdlException {
- // TODO: merge from cloud.
- throw new DdlException("Env.waitForAutoStart unimplemented");
- }
-
// Switch catalog of this sesseion.
public void changeCatalog(ConnectContext ctx, String catalogName) throws
DdlException {
CatalogIf catalogIf = catalogMgr.getCatalogNullable(catalogName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
index 5288cd73ea7..50f29bdc4f0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -52,8 +52,11 @@ import java.util.stream.Collectors;
public class CloudClusterChecker extends MasterDaemon {
private static final Logger LOG =
LogManager.getLogger(CloudClusterChecker.class);
- public CloudClusterChecker() {
+ private CloudSystemInfoService cloudSystemInfoService;
+
+ public CloudClusterChecker(CloudSystemInfoService cloudSystemInfoService) {
super("cloud cluster check",
Config.cloud_cluster_check_interval_second * 1000L);
+ this.cloudSystemInfoService = cloudSystemInfoService;
}
/**
@@ -117,7 +120,7 @@ public class CloudClusterChecker extends MasterDaemon {
b.setTagMap(newTagMap);
toAdd.add(b);
}
- Env.getCurrentSystemInfo().updateCloudBackends(toAdd, new
ArrayList<>());
+ cloudSystemInfoService.updateCloudBackends(toAdd, new
ArrayList<>());
}
);
}
@@ -133,15 +136,15 @@ public class CloudClusterChecker extends MasterDaemon {
LOG.debug("begin to drop clusterId: {}", delId);
List<Backend> toDel =
new
ArrayList<>(finalClusterIdToBackend.getOrDefault(delId, new ArrayList<>()));
- Env.getCurrentSystemInfo().updateCloudBackends(new
ArrayList<>(), toDel);
+ cloudSystemInfoService.updateCloudBackends(new ArrayList<>(),
toDel);
// del clusterName
- String delClusterName =
Env.getCurrentSystemInfo().getClusterNameByClusterId(delId);
+ String delClusterName =
cloudSystemInfoService.getClusterNameByClusterId(delId);
if (delClusterName.isEmpty()) {
LOG.warn("can't get delClusterName, clusterId: {}, plz
check", delId);
return;
}
// del clusterID
- Env.getCurrentSystemInfo().dropCluster(delId, delClusterName);
+ cloudSystemInfoService.dropCluster(delId, delClusterName);
}
);
}
@@ -202,12 +205,12 @@ public class CloudClusterChecker extends MasterDaemon {
// change all be's cluster_name
currentBes.forEach(b -> b.setCloudClusterName(newClusterName));
// update clusterNameToId
-
Env.getCurrentSystemInfo().updateClusterNameToId(newClusterName,
currentClusterName, cid);
+ cloudSystemInfoService.updateClusterNameToId(newClusterName,
currentClusterName, cid);
// update tags
currentBes.forEach(b ->
Env.getCurrentEnv().getEditLog().logModifyBackend(b));
}
- String currentClusterStatus =
Env.getCurrentSystemInfo().getCloudStatusById(cid);
+ String currentClusterStatus =
cloudSystemInfoService.getCloudStatusById(cid);
// For old versions that do no have status field set
ClusterStatus clusterStatus = cp.hasClusterStatus() ?
cp.getClusterStatus() : ClusterStatus.NORMAL;
@@ -286,7 +289,7 @@ public class CloudClusterChecker extends MasterDaemon {
continue;
}
- Env.getCurrentSystemInfo().updateCloudBackends(toAdd, toDel);
+ cloudSystemInfoService.updateCloudBackends(toAdd, toDel);
}
}
@@ -299,7 +302,7 @@ public class CloudClusterChecker extends MasterDaemon {
private void checkFeNodesMapValid() {
LOG.debug("begin checkFeNodesMapValid");
- Map<String, List<Backend>> clusterIdToBackend =
Env.getCurrentSystemInfo().getCloudClusterIdToBackend();
+ Map<String, List<Backend>> clusterIdToBackend =
cloudSystemInfoService.getCloudClusterIdToBackend();
Set<String> clusterIds = new HashSet<>();
Set<String> clusterNames = new HashSet<>();
clusterIdToBackend.forEach((clusterId, bes) -> {
@@ -313,7 +316,7 @@ public class CloudClusterChecker extends MasterDaemon {
});
});
- Map<String, String> nameToId =
Env.getCurrentSystemInfo().getCloudClusterNameToId();
+ Map<String, String> nameToId =
cloudSystemInfoService.getCloudClusterNameToId();
nameToId.forEach((clusterName, clusterId) -> {
if (!clusterIdToBackend.containsKey(clusterId)) {
LOG.warn("impossible, somewhere err, clusterId {}, clusterName
{}, clusterNameToIdMap {}",
@@ -404,7 +407,7 @@ public class CloudClusterChecker extends MasterDaemon {
}
private void getCloudBackends() {
- Map<String, List<Backend>> clusterIdToBackend =
Env.getCurrentSystemInfo().getCloudClusterIdToBackend();
+ Map<String, List<Backend>> clusterIdToBackend =
cloudSystemInfoService.getCloudClusterIdToBackend();
//rpc to ms, to get mysql user can use cluster_id
// NOTE: rpc args all empty, use cluster_unique_id to get a instance's
all cluster info.
Cloud.GetClusterResponse response =
CloudSystemInfoService.getCloudCluster("", "", "");
@@ -443,14 +446,14 @@ public class CloudClusterChecker extends MasterDaemon {
LOG.warn("diff cluster has exception, {}", e.getMessage(), e);
}
- LOG.info("daemon cluster get cluster info succ, current
cloudClusterIdToBackendMap: {}",
- Env.getCurrentSystemInfo().getCloudClusterIdToBackend());
+ LOG.info("daemon cluster get cluster info succ, current
cloudClusterIdToBackendMap: {} clusterNameToId {}",
+ cloudSystemInfoService.getCloudClusterIdToBackend(),
cloudSystemInfoService.getCloudClusterNameToId());
}
private void updateCloudMetrics() {
// Metric
- Map<String, List<Backend>> clusterIdToBackend =
Env.getCurrentSystemInfo().getCloudClusterIdToBackend();
- Map<String, String> clusterNameToId =
Env.getCurrentSystemInfo().getCloudClusterNameToId();
+ Map<String, List<Backend>> clusterIdToBackend =
cloudSystemInfoService.getCloudClusterIdToBackend();
+ Map<String, String> clusterNameToId =
cloudSystemInfoService.getCloudClusterNameToId();
for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) {
long aliveNum = 0L;
List<Backend> bes = clusterIdToBackend.get(entry.getValue());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index 888eee5a0c1..f43704beb58 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -17,17 +17,22 @@
package org.apache.doris.cloud.catalog;
+import org.apache.doris.analysis.ResourceTypeEnum;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.proto.Cloud.NodeInfoPB;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.util.HttpURLUtil;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.httpv2.meta.MetaBaseAction;
+import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.Storage;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService.HostInfo;
@@ -52,7 +57,7 @@ public class CloudEnv extends Env {
public CloudEnv(boolean isCheckpointCatalog) {
super(isCheckpointCatalog);
- this.cloudClusterCheck = new CloudClusterChecker();
+ this.cloudClusterCheck = new
CloudClusterChecker((CloudSystemInfoService) systemInfo);
}
protected void startMasterOnlyDaemonThreads() {
@@ -355,5 +360,21 @@ public class CloudEnv extends Env {
public long saveTransactionState(CountingDataOutputStream dos, long
checksum) throws IOException {
return checksum;
}
+
+ public void checkCloudClusterPriv(String clusterName) throws DdlException {
+ // check resource usage privilege
+ if
(!Env.getCurrentEnv().getAuth().checkCloudPriv(ConnectContext.get().getCurrentUserIdentity(),
+ clusterName, PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER)) {
+ throw new DdlException("USAGE denied to user "
+ + ConnectContext.get().getQualifiedUser() + "'@'" +
ConnectContext.get().getRemoteIP()
+ + "' for cloud cluster '" + clusterName + "'",
ErrorCode.ERR_CLUSTER_NO_PERMISSIONS);
+ }
+
+ if (!((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getCloudClusterNames().contains(clusterName)) {
+ LOG.debug("current instance does not have a cluster name :{}",
clusterName);
+ throw new DdlException(String.format("Cluster %s not exist",
clusterName),
+ ErrorCode.ERR_CLOUD_CLUSTER_ERROR);
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
index b8329deffbb..2ac0ee8fe67 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
@@ -17,8 +17,8 @@
package org.apache.doris.cloud.catalog;
-import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;
@@ -27,23 +27,24 @@ import org.apache.logging.log4j.Logger;
public class CloudInstanceStatusChecker extends MasterDaemon {
private static final Logger LOG =
LogManager.getLogger(CloudInstanceStatusChecker.class);
+ private CloudSystemInfoService cloudSystemInfoService;
- public CloudInstanceStatusChecker() {
+ public CloudInstanceStatusChecker(CloudSystemInfoService
cloudSystemInfoService) {
super("cloud instance check");
+ this.cloudSystemInfoService = cloudSystemInfoService;
}
@Override
protected void runAfterCatalogReady() {
try {
- Cloud.GetInstanceResponse response =
- Env.getCurrentSystemInfo().getCloudInstance();
+ Cloud.GetInstanceResponse response =
cloudSystemInfoService.getCloudInstance();
LOG.debug("get from ms response {}", response);
if (!response.hasStatus() || !response.getStatus().hasCode()
|| response.getStatus().getCode() !=
Cloud.MetaServiceCode.OK) {
LOG.warn("failed to get cloud instance due to incomplete
response, "
+ "cloud_unique_id={}, response={}",
Config.cloud_unique_id, response);
} else {
-
Env.getCurrentSystemInfo().setInstanceStatus(response.getInstance().getStatus());
+
cloudSystemInfoService.setInstanceStatus(response.getInstance().getStatus());
}
} catch (Exception e) {
LOG.warn("get instance from ms exception", e);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index 5054fa7e41e..8e9b279bc5a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -20,6 +20,7 @@ package org.apache.doris.cloud.catalog;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
@@ -81,7 +82,7 @@ public class CloudReplica extends Replica {
}
private long getColocatedBeId(String cluster) {
- List<Backend> bes =
Env.getCurrentSystemInfo().getBackendsByClusterId(cluster);
+ List<Backend> bes = ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getBackendsByClusterId(cluster);
List<Backend> availableBes = new ArrayList<>();
for (Backend be : bes) {
if (be.isAlive()) {
@@ -110,7 +111,7 @@ public class CloudReplica extends Replica {
if
(!Strings.isNullOrEmpty(context.getSessionVariable().getCloudCluster())) {
cluster = context.getSessionVariable().getCloudCluster();
try {
- Env.getCurrentEnv().checkCloudClusterPriv(cluster);
+ ((CloudEnv)
Env.getCurrentEnv()).checkCloudClusterPriv(cluster);
} catch (Exception e) {
LOG.warn("get cluster by session context exception");
return -1;
@@ -127,7 +128,8 @@ public class CloudReplica extends Replica {
// check default cluster valid.
if (!Strings.isNullOrEmpty(cluster)) {
- boolean exist =
Env.getCurrentSystemInfo().getCloudClusterNames().contains(cluster);
+ boolean exist = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getCloudClusterNames().contains(cluster);
if (!exist) {
//can't use this default cluster, plz change another
LOG.warn("cluster: {} is not existed", cluster);
@@ -140,12 +142,12 @@ public class CloudReplica extends Replica {
// if cluster is SUSPENDED, wait
try {
- Env.waitForAutoStart(cluster);
+ CloudSystemInfoService.waitForAutoStart(cluster);
} catch (DdlException e) {
// this function cant throw exception. so just log it
LOG.warn("cant resume cluster {}", cluster);
}
- String clusterId =
Env.getCurrentSystemInfo().getCloudClusterIdByName(cluster);
+ String clusterId = ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getCloudClusterIdByName(cluster);
if (isColocated()) {
return getColocatedBeId(clusterId);
@@ -201,7 +203,8 @@ public class CloudReplica extends Replica {
public long hashReplicaToBe(String clusterId, boolean isBackGround) {
// TODO(luwei) list should be sorted
- List<Backend> clusterBes =
Env.getCurrentSystemInfo().getBackendsByClusterId(clusterId);
+ List<Backend> clusterBes = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getBackendsByClusterId(clusterId);
// use alive be to exec sql
List<Backend> availableBes = new ArrayList<>();
for (Backend be : clusterBes) {
@@ -247,7 +250,8 @@ public class CloudReplica extends Replica {
public List<Long> hashReplicaToBes(String clusterId, boolean isBackGround,
int replicaNum) {
// TODO(luwei) list should be sorted
- List<Backend> clusterBes =
Env.getCurrentSystemInfo().getBackendsByClusterId(clusterId);
+ List<Backend> clusterBes = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getBackendsByClusterId(clusterId);
// use alive be to exec sql
List<Backend> availableBes = new ArrayList<>();
for (Backend be : clusterBes) {
@@ -325,7 +329,8 @@ public class CloudReplica extends Replica {
int count = in.readInt();
for (int i = 0; i < count; ++i) {
String clusterId = Text.readString(in);
- String realClusterId =
Env.getCurrentSystemInfo().getCloudClusterIdByName(clusterId);
+ String realClusterId = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getCloudClusterIdByName(clusterId);
LOG.debug("cluster Id {}, real cluster Id {}", clusterId,
realClusterId);
if (!Strings.isNullOrEmpty(realClusterId)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
index f56cec83b99..8381d2d0959 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.load.BrokerFileGroup;
@@ -61,11 +62,12 @@ public class CloudBrokerLoadJob extends BrokerLoadJob {
throw new MetaNotFoundException("cluster name is empty");
}
- this.cloudClusterId =
Env.getCurrentSystemInfo().getCloudClusterIdByName(clusterName);
+ this.cloudClusterId = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getCloudClusterIdByName(clusterName);
if
(!Strings.isNullOrEmpty(context.getSessionVariable().getCloudCluster())) {
clusterName = context.getSessionVariable().getCloudCluster();
this.cloudClusterId =
-
Env.getCurrentSystemInfo().getCloudClusterIdByName(clusterName);
+ ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getCloudClusterIdByName(clusterName);
}
if (Strings.isNullOrEmpty(this.cloudClusterId)) {
LOG.warn("cluster id is empty, cluster name {}", clusterName);
@@ -77,7 +79,8 @@ public class CloudBrokerLoadJob extends BrokerLoadJob {
private AutoCloseConnectContext buildConnectContext() throws UserException
{
cloudClusterId = sessionVariables.get(CLOUD_CLUSTER_ID);
- String clusterName =
Env.getCurrentSystemInfo().getClusterNameByClusterId(cloudClusterId);
+ String clusterName = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getClusterNameByClusterId(cloudClusterId);
if (Strings.isNullOrEmpty(clusterName)) {
LOG.warn("cluster name is empty, cluster id is {}",
cloudClusterId);
throw new UserException("cluster name is empty, cluster id is: " +
cloudClusterId);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 681f7e195e6..dba7daef355 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -20,9 +20,13 @@ package org.apache.doris.cloud.system;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.proto.Cloud.ClusterPB;
+import org.apache.doris.cloud.proto.Cloud.InstanceInfoPB;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.resource.Tag;
@@ -32,6 +36,7 @@ import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
@@ -41,13 +46,30 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
public class CloudSystemInfoService extends SystemInfoService {
private static final Logger LOG =
LogManager.getLogger(CloudSystemInfoService.class);
+ // TODO(gavin): use {clusterId -> List<BackendId>} instead to reduce risk
of inconsistency
+ // use exclusive lock to make sure only one thread can change
clusterIdToBackend and clusterNameToId
+ protected ReentrantLock lock = new ReentrantLock();
+
+ // for show cluster and cache user owned cluster
+ // mysqlUserName -> List of ClusterPB
+ private Map<String, List<ClusterPB>> mysqlUserNameToClusterPB =
ImmutableMap.of();
+ // clusterId -> List<Backend>
+ protected Map<String, List<Backend>> clusterIdToBackend = new
ConcurrentHashMap<>();
+ // clusterName -> clusterId
+ protected Map<String, String> clusterNameToId = new ConcurrentHashMap<>();
+
+ private InstanceInfoPB.Status instanceStatus;
+
@Override
public Map<Tag, List<Long>> selectBackendIdsForReplicaCreation(
ReplicaAllocation replicaAlloc, Map<Tag, Integer> nextIndexs,
@@ -61,7 +83,7 @@ public class CloudSystemInfoService extends SystemInfoService
{
* Gets cloud cluster from remote with either clusterId or clusterName
*
* @param clusterName cluster name
- * @param clusterId cluster id
+ * @param clusterId cluster id
* @return
*/
public static Cloud.GetClusterResponse getCloudCluster(String clusterName,
String clusterId, String userName) {
@@ -86,7 +108,7 @@ public class CloudSystemInfoService extends
SystemInfoService {
LOG.info("nothing to do");
return;
}
- Set<String> existedBes = idToBackendRef.entrySet().stream().map(i ->
i.getValue())
+ Set<String> existedBes = idToBackendRef.values().stream()
.map(i -> i.getHost() + ":" + i.getHeartbeatPort())
.collect(Collectors.toSet());
LOG.debug("deduplication existedBes={}", existedBes);
@@ -157,79 +179,83 @@ public class CloudSystemInfoService extends
SystemInfoService {
public void updateCloudClusterMap(List<Backend> toAdd, List<Backend>
toDel) {
lock.lock();
- Set<String> clusterNameSet = new HashSet<>();
- for (Backend b : toAdd) {
- String clusterName = b.getCloudClusterName();
- String clusterId = b.getCloudClusterId();
- if (clusterName.isEmpty() || clusterId.isEmpty()) {
- LOG.warn("cloud cluster name or id empty: id={}, name={}",
clusterId, clusterName);
- continue;
- }
- clusterNameSet.add(clusterName);
- if (clusterNameSet.size() != 1) {
- LOG.warn("toAdd be list have multi clusterName, please check,
Set: {}", clusterNameSet);
- }
+ try {
+ Set<String> clusterNameSet = new HashSet<>();
+ for (Backend b : toAdd) {
+ String clusterName = b.getCloudClusterName();
+ String clusterId = b.getCloudClusterId();
+ if (clusterName.isEmpty() || clusterId.isEmpty()) {
+ LOG.warn("cloud cluster name or id empty: id={}, name={}",
clusterId, clusterName);
+ continue;
+ }
+ clusterNameSet.add(clusterName);
+ if (clusterNameSet.size() != 1) {
+ LOG.warn("toAdd be list have multi clusterName, please
check, Set: {}", clusterNameSet);
+ }
- clusterNameToId.put(clusterName, clusterId);
- List<Backend> be = clusterIdToBackend.get(clusterId);
- if (be == null) {
- be = new ArrayList<>();
- clusterIdToBackend.put(clusterId, be);
- MetricRepo.registerClusterMetrics(clusterName, clusterId);
- }
- Set<String> existed = be.stream().map(i -> i.getHost() + ":" +
i.getHeartbeatPort())
- .collect(Collectors.toSet());
- // Deduplicate
- // TODO(gavin): consider vpc
- boolean alreadyExisted = existed.contains(b.getHost() + ":" +
b.getHeartbeatPort());
- if (alreadyExisted) {
- LOG.info("BE already existed, clusterName={} clusterId={}
backendNum={} backend={}",
+ clusterNameToId.put(clusterName, clusterId);
+ List<Backend> be = clusterIdToBackend.get(clusterId);
+ if (be == null) {
+ be = new ArrayList<>();
+ clusterIdToBackend.put(clusterId, be);
+ MetricRepo.registerClusterMetrics(clusterName, clusterId);
+ }
+ Set<String> existed = be.stream().map(i -> i.getHost() + ":" +
i.getHeartbeatPort())
+ .collect(Collectors.toSet());
+ // Deduplicate
+ // TODO(gavin): consider vpc
+ boolean alreadyExisted = existed.contains(b.getHost() + ":" +
b.getHeartbeatPort());
+ if (alreadyExisted) {
+ LOG.info("BE already existed, clusterName={} clusterId={}
backendNum={} backend={}",
+ clusterName, clusterId, be.size(), b);
+ continue;
+ }
+ be.add(b);
+ LOG.info("update (add) cloud cluster map, clusterName={}
clusterId={} backendNum={} current backend={}",
clusterName, clusterId, be.size(), b);
- continue;
}
- be.add(b);
- LOG.info("update (add) cloud cluster map, clusterName={}
clusterId={} backendNum={} current backend={}",
- clusterName, clusterId, be.size(), b);
- }
- for (Backend b : toDel) {
- String clusterName = b.getCloudClusterName();
- String clusterId = b.getCloudClusterId();
- // We actually don't care about cluster name here
- if (clusterName.isEmpty() || clusterId.isEmpty()) {
- LOG.warn("cloud cluster name or id empty: id={}, name={}",
clusterId, clusterName);
- continue;
- }
- List<Backend> be = clusterIdToBackend.get(clusterId);
- if (be == null) {
- LOG.warn("try to remove a non-existing cluster, clusterId={}
clusterName={}",
- clusterId, clusterName);
- continue;
- }
- Set<Long> d = toDel.stream().map(i ->
i.getId()).collect(Collectors.toSet());
- be = be.stream().filter(i ->
!d.contains(i.getId())).collect(Collectors.toList());
- // ATTN: clusterId may have zero nodes
- clusterIdToBackend.replace(clusterId, be);
- // such as dropCluster, but no lock
- // ATTN: Empty clusters are treated as dropped clusters.
- if (be.size() == 0) {
- LOG.info("del clusterId {} and clusterName {} due to be nodes
eq 0", clusterId, clusterName);
- boolean succ = clusterNameToId.remove(clusterName, clusterId);
- if (!succ) {
- LOG.warn("impossible, somewhere err, clusterNameToId {}, "
- + "want remove cluster name {}, cluster id {}",
clusterNameToId, clusterName, clusterId);
+ for (Backend b : toDel) {
+ String clusterName = b.getCloudClusterName();
+ String clusterId = b.getCloudClusterId();
+ // We actually don't care about cluster name here
+ if (clusterName.isEmpty() || clusterId.isEmpty()) {
+ LOG.warn("cloud cluster name or id empty: id={}, name={}",
clusterId, clusterName);
+ continue;
}
- clusterIdToBackend.remove(clusterId);
+ List<Backend> be = clusterIdToBackend.get(clusterId);
+ if (be == null) {
+ LOG.warn("try to remove a non-existing cluster,
clusterId={} clusterName={}",
+ clusterId, clusterName);
+ continue;
+ }
+ Set<Long> d = toDel.stream().map(i ->
i.getId()).collect(Collectors.toSet());
+ be = be.stream().filter(i ->
!d.contains(i.getId())).collect(Collectors.toList());
+ // ATTN: clusterId may have zero nodes
+ clusterIdToBackend.replace(clusterId, be);
+ // such as dropCluster, but no lock
+ // ATTN: Empty clusters are treated as dropped clusters.
+ if (be.size() == 0) {
+ LOG.info("del clusterId {} and clusterName {} due to be
nodes eq 0", clusterId, clusterName);
+ boolean succ = clusterNameToId.remove(clusterName,
clusterId);
+ if (!succ) {
+ LOG.warn("impossible, somewhere err, clusterNameToId
{}, "
+ + "want remove cluster name {}, cluster id {}",
+ clusterNameToId, clusterName, clusterId);
+ }
+ clusterIdToBackend.remove(clusterId);
+ }
+ LOG.info("update (del) cloud cluster map, clusterName={}
clusterId={} backendNum={} current backend={}",
+ clusterName, clusterId, be.size(), b);
}
- LOG.info("update (del) cloud cluster map, clusterName={}
clusterId={} backendNum={} current backend={}",
- clusterName, clusterId, be.size(), b);
+ } finally {
+ lock.unlock();
}
- lock.unlock();
}
public static synchronized void updateFrontends(List<Frontend> toAdd,
- List<Frontend> toDel) throws
DdlException {
+ List<Frontend> toDel)
throws DdlException {
LOG.debug("updateCloudFrontends toAdd={} toDel={}", toAdd, toDel);
String masterIp = Env.getCurrentEnv().getMasterHost();
for (Frontend fe : toAdd) {
@@ -248,5 +274,180 @@ public class CloudSystemInfoService extends
SystemInfoService {
LOG.info("dropped cloud frontend={} ", fe);
}
}
-}
+ public void replayAddBackend(Backend newBackend) {
+ super.replayAddBackend(newBackend);
+ List<Backend> toAdd = new ArrayList<>();
+ toAdd.add(newBackend);
+ updateCloudClusterMap(toAdd, new ArrayList<>());
+ }
+
+ public void replayDropBackend(Backend backend) {
+ super.replayDropBackend(backend);
+ List<Backend> toDel = new ArrayList<>();
+ toDel.add(backend);
+ updateCloudClusterMap(new ArrayList<>(), toDel);
+ }
+
+ public boolean availableBackendsExists() {
+ if (FeConstants.runningUnitTest) {
+ return true;
+ }
+ if (null == clusterNameToId || clusterNameToId.isEmpty()) {
+ return false;
+ }
+ return clusterIdToBackend != null && !clusterIdToBackend.isEmpty()
+ && clusterIdToBackend.values().stream().anyMatch(list -> list !=
null && !list.isEmpty());
+ }
+
+ public boolean containClusterName(String clusterName) {
+ return clusterNameToId.containsKey(clusterName);
+ }
+
+ public List<Backend> getBackendsByClusterName(final String clusterName) {
+ String clusterId = clusterNameToId.getOrDefault(clusterName, "");
+ if (clusterId.isEmpty()) {
+ return new ArrayList<>();
+ }
+ return clusterIdToBackend.get(clusterId);
+ }
+
+ public List<Backend> getBackendsByClusterId(final String clusterId) {
+ return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>());
+ }
+
+ public List<String> getCloudClusterIds() {
+ return new ArrayList<>(clusterIdToBackend.keySet());
+ }
+
+ public String getCloudStatusByName(final String clusterName) {
+ String clusterId = clusterNameToId.getOrDefault(clusterName, "");
+ if (Strings.isNullOrEmpty(clusterId)) {
+ // for rename cluster or dropped cluster
+ LOG.warn("cant find clusterId by clusterName {}", clusterName);
+ return "";
+ }
+ return getCloudStatusById(clusterId);
+ }
+
+ public String getCloudStatusById(final String clusterId) {
+ return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>())
+
.stream().map(Backend::getCloudClusterStatus).findFirst().orElse("");
+ }
+
+ public void updateClusterNameToId(final String newName,
+ final String originalName, final String
clusterId) {
+ lock.lock();
+ try {
+ clusterNameToId.remove(originalName);
+ clusterNameToId.put(newName, clusterId);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public String getClusterNameByClusterId(final String clusterId) {
+ String clusterName = "";
+ for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) {
+ if (entry.getValue().equals(clusterId)) {
+ clusterName = entry.getKey();
+ break;
+ }
+ }
+ return clusterName;
+ }
+
+ public void dropCluster(final String clusterId, final String clusterName) {
+ lock.lock();
+ try {
+ clusterNameToId.remove(clusterName, clusterId);
+ clusterIdToBackend.remove(clusterId);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public List<String> getCloudClusterNames() {
+ return new ArrayList<>(clusterNameToId.keySet());
+ }
+
+ // Return the ref of concurrentMap clusterIdToBackend
+ // It should be thread-safe to iterate.
+ // reference:
https://stackoverflow.com/questions/3768554/is-iterating-concurrenthashmap-values-thread-safe
+ public Map<String, List<Backend>> getCloudClusterIdToBackend() {
+ return clusterIdToBackend;
+ }
+
+ public String getCloudClusterIdByName(String clusterName) {
+ return clusterNameToId.get(clusterName);
+ }
+
+ public ImmutableMap<Long, Backend> getCloudIdToBackend(String clusterName)
{
+ String clusterId = clusterNameToId.get(clusterName);
+ if (Strings.isNullOrEmpty(clusterId)) {
+ LOG.warn("cant find clusterId, this cluster may be has been
dropped, clusterName={}", clusterName);
+ return ImmutableMap.of();
+ }
+ List<Backend> backends = clusterIdToBackend.get(clusterId);
+ Map<Long, Backend> idToBackend = Maps.newHashMap();
+ for (Backend be : backends) {
+ idToBackend.put(be.getId(), be);
+ }
+ return ImmutableMap.copyOf(idToBackend);
+ }
+
+ // Return the ref of concurrentMap clusterNameToId
+ // It should be thread-safe to iterate.
+ // reference:
https://stackoverflow.com/questions/3768554/is-iterating-concurrenthashmap-values-thread-safe
+ public Map<String, String> getCloudClusterNameToId() {
+ return clusterNameToId;
+ }
+
+ public Map<String, List<ClusterPB>> getMysqlUserNameToClusterPb() {
+ return mysqlUserNameToClusterPB;
+ }
+
+ public void updateMysqlUserNameToClusterPb(Map<String, List<ClusterPB>> m)
{
+ mysqlUserNameToClusterPB = m;
+ }
+
+ public List<Pair<String, Integer>> getCurrentObFrontends() {
+ List<Frontend> frontends =
Env.getCurrentEnv().getFrontends(FrontendNodeType.OBSERVER);
+ List<Pair<String, Integer>> frontendsPair = new ArrayList<>();
+ for (Frontend frontend : frontends) {
+ frontendsPair.add(Pair.of(frontend.getHost(),
frontend.getEditLogPort()));
+ }
+ return frontendsPair;
+ }
+
+ public Cloud.GetInstanceResponse getCloudInstance() {
+ Cloud.GetInstanceRequest.Builder builder =
Cloud.GetInstanceRequest.newBuilder();
+ builder.setCloudUniqueId(Config.cloud_unique_id);
+ final Cloud.GetInstanceRequest pRequest = builder.build();
+ Cloud.GetInstanceResponse response;
+ try {
+ response = MetaServiceProxy.getInstance().getInstance(pRequest);
+ return response;
+ } catch (RpcException e) {
+ LOG.warn("rpcToGetInstance exception: {}", e.getMessage());
+ }
+ return null;
+ }
+
+ public InstanceInfoPB.Status getInstanceStatus() {
+ return this.instanceStatus;
+ }
+
+ public void setInstanceStatus(InstanceInfoPB.Status instanceStatus) {
+ LOG.debug("fe set instance status {}", instanceStatus);
+ if (this.instanceStatus != instanceStatus) {
+ LOG.info("fe change instance status from {} to {}",
this.instanceStatus, instanceStatus);
+ }
+ this.instanceStatus = instanceStatus;
+ }
+
+ public static void waitForAutoStart(final String clusterName) throws
DdlException {
+ // TODO: merge from cloud.
+ throw new DdlException("Env.waitForAutoStart unimplemented");
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index b5794464d98..95f0c5831a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -33,6 +33,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FunctionRegistry;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
@@ -1082,11 +1083,12 @@ public class ConnectContext {
}
public String getAuthorizedCloudCluster() {
- List<String> cloudClusterNames =
Env.getCurrentSystemInfo().getCloudClusterNames();
+ List<String> cloudClusterNames = ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getCloudClusterNames();
// get all available cluster of the user
for (String cloudClusterName : cloudClusterNames) {
// find a cluster has more than one alive be
- List<Backend> bes =
Env.getCurrentSystemInfo().getBackendsByClusterName(cloudClusterName);
+ List<Backend> bes = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getBackendsByClusterName(cloudClusterName);
AtomicBoolean hasAliveBe = new AtomicBoolean(false);
bes.stream().filter(Backend::isAlive).findAny().ifPresent(backend
-> {
LOG.debug("get a clusterName {}, it's has more than one alive
be {}", clusterName, backend);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index f45086f239d..05ba7e073ee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -23,10 +23,6 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ReplicaAllocation;
-import org.apache.doris.cloud.proto.Cloud;
-import org.apache.doris.cloud.proto.Cloud.ClusterPB;
-import org.apache.doris.cloud.proto.Cloud.InstanceInfoPB;
-import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@@ -36,11 +32,9 @@ import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.util.NetUtils;
-import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
-import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TNodeInfo;
import org.apache.doris.thrift.TPaloNodesInfo;
import org.apache.doris.thrift.TStatusCode;
@@ -69,9 +63,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
public class SystemInfoService {
@@ -88,22 +80,9 @@ public class SystemInfoService {
protected volatile ImmutableMap<Long, Backend> idToBackendRef =
ImmutableMap.of();
protected volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef =
ImmutableMap.of();
- // TODO(gavin): use {clusterId -> List<BackendId>} instead to reduce risk
of inconsistency
- // use exclusive lock to make sure only one thread can change
clusterIdToBackend and clusterNameToId
- protected ReentrantLock lock = new ReentrantLock();
-
- // for show cluster and cache user owned cluster
- // mysqlUserName -> List of ClusterPB
- private Map<String, List<ClusterPB>> mysqlUserNameToClusterPB =
ImmutableMap.of();
- // clusterId -> List<Backend>
- protected Map<String, List<Backend>> clusterIdToBackend = new
ConcurrentHashMap<>();
- // clusterName -> clusterId
- protected Map<String, String> clusterNameToId = new ConcurrentHashMap<>();
private volatile ImmutableMap<Long, DiskInfo> pathHashToDiskInfoRef =
ImmutableMap.of();
- private InstanceInfoPB.Status instanceStatus;
-
public static class HostInfo implements Comparable<HostInfo> {
public String host;
public int port;
@@ -183,114 +162,6 @@ public class SystemInfoService {
}
};
- public boolean availableBackendsExists() {
- if (FeConstants.runningUnitTest) {
- return true;
- }
- if (null == clusterNameToId || clusterNameToId.isEmpty()) {
- return false;
- }
- return clusterIdToBackend != null && !clusterIdToBackend.isEmpty()
- && clusterIdToBackend.values().stream().anyMatch(list -> list
!= null && !list.isEmpty());
- }
-
- public boolean containClusterName(String clusterName) {
- return clusterNameToId.containsKey(clusterName);
- }
-
- public List<Backend> getBackendsByClusterName(final String clusterName) {
- String clusterId = clusterNameToId.getOrDefault(clusterName, "");
- if (clusterId.isEmpty()) {
- return new ArrayList<>();
- }
- return clusterIdToBackend.get(clusterId);
- }
-
- public List<Backend> getBackendsByClusterId(final String clusterId) {
- return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>());
- }
-
- public List<String> getCloudClusterIds() {
- return new ArrayList<>(clusterIdToBackend.keySet());
- }
-
- public String getCloudStatusByName(final String clusterName) {
- String clusterId = clusterNameToId.getOrDefault(clusterName, "");
- if (Strings.isNullOrEmpty(clusterId)) {
- // for rename cluster or dropped cluster
- LOG.warn("cant find clusterId by clusterName {}", clusterName);
- return "";
- }
- return getCloudStatusById(clusterId);
- }
-
- public String getCloudStatusById(final String clusterId) {
- return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>())
-
.stream().map(Backend::getCloudClusterStatus).findFirst().orElse("");
- }
-
- public void updateClusterNameToId(final String newName,
- final String originalName, final String clusterId) {
- lock.lock();
- clusterNameToId.remove(originalName);
- clusterNameToId.put(newName, clusterId);
- lock.unlock();
- }
-
- public String getClusterNameByClusterId(final String clusterId) {
- String clusterName = "";
- for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) {
- if (entry.getValue().equals(clusterId)) {
- clusterName = entry.getKey();
- break;
- }
- }
- return clusterName;
- }
-
- public void dropCluster(final String clusterId, final String clusterName) {
- lock.lock();
- clusterNameToId.remove(clusterName, clusterId);
- clusterIdToBackend.remove(clusterId);
- lock.unlock();
- }
-
- public List<String> getCloudClusterNames() {
- return new ArrayList<>(clusterNameToId.keySet());
- }
-
- // Return the ref of concurrentMap clusterIdToBackend
- // It should be thread-safe to iterate.
- // reference:
https://stackoverflow.com/questions/3768554/is-iterating-concurrenthashmap-values-thread-safe
- public Map<String, List<Backend>> getCloudClusterIdToBackend() {
- return clusterIdToBackend;
- }
-
- public String getCloudClusterIdByName(String clusterName) {
- return clusterNameToId.get(clusterName);
- }
-
- public ImmutableMap<Long, Backend> getCloudIdToBackend(String clusterName)
{
- String clusterId = clusterNameToId.get(clusterName);
- if (Strings.isNullOrEmpty(clusterId)) {
- LOG.warn("cant find clusterId, this cluster may be has been
dropped, clusterName={}", clusterName);
- return ImmutableMap.of();
- }
- List<Backend> backends = clusterIdToBackend.get(clusterId);
- Map<Long, Backend> idToBackend = Maps.newHashMap();
- for (Backend be : backends) {
- idToBackend.put(be.getId(), be);
- }
- return ImmutableMap.copyOf(idToBackend);
- }
-
- // Return the ref of concurrentMap clusterNameToId
- // It should be thread-safe to iterate.
- // reference:
https://stackoverflow.com/questions/3768554/is-iterating-concurrenthashmap-values-thread-safe
- public Map<String, String> getCloudClusterNameToId() {
- return clusterNameToId;
- }
-
public static TPaloNodesInfo createAliveNodesInfo() {
TPaloNodesInfo nodesInfo = new TPaloNodesInfo();
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
@@ -301,23 +172,6 @@ public class SystemInfoService {
return nodesInfo;
}
- public Map<String, List<ClusterPB>> getMysqlUserNameToClusterPb() {
- return mysqlUserNameToClusterPB;
- }
-
- public void updateMysqlUserNameToClusterPb(Map<String, List<ClusterPB>> m)
{
- mysqlUserNameToClusterPB = m;
- }
-
- public List<Pair<String, Integer>> getCurrentObFrontends() {
- List<Frontend> frontends =
Env.getCurrentEnv().getFrontends(FrontendNodeType.OBSERVER);
- List<Pair<String, Integer>> frontendsPair = new ArrayList<>();
- for (Frontend frontend : frontends) {
- frontendsPair.add(Pair.of(frontend.getHost(),
frontend.getEditLogPort()));
- }
- return frontendsPair;
- }
-
// for deploy manager
public void addBackends(List<HostInfo> hostInfos, boolean isFree)
throws UserException {
@@ -1141,35 +995,4 @@ public class SystemInfoService {
public long aliveBECount() {
return
idToBackendRef.values().stream().filter(Backend::isAlive).count();
}
-
- public Cloud.GetInstanceResponse getCloudInstance() {
- Cloud.GetInstanceRequest.Builder builder =
- Cloud.GetInstanceRequest.newBuilder();
- builder.setCloudUniqueId(Config.cloud_unique_id);
- final Cloud.GetInstanceRequest pRequest = builder.build();
- Cloud.GetInstanceResponse response;
- try {
- response = MetaServiceProxy.getInstance().getInstance(pRequest);
- return response;
- } catch (RpcException e) {
- LOG.warn("rpcToGetInstance exception: {}", e.getMessage());
- }
- return null;
- }
-
- public InstanceInfoPB.Status getInstanceStatus() {
- return this.instanceStatus;
- }
-
- public void setInstanceStatus(InstanceInfoPB.Status instanceStatus) {
- LOG.debug("fe set instance status {}", instanceStatus);
- if (this.instanceStatus != instanceStatus) {
- LOG.info("fe change instance status from {} to {}",
this.instanceStatus, instanceStatus);
- }
- this.instanceStatus = instanceStatus;
- }
-
- public synchronized void updateCloudBackends(List<Backend> toAdd,
List<Backend> toDel) {
- LOG.warn("Not cloud mode, should not be here");
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]