This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ef2f4699452 [fix](cloud tvf) Fix tvf query run in cloud multi cluster
#37157 (#39249)
ef2f4699452 is described below
commit ef2f46994529c9c420a60a183235afcaeabea85c
Author: deardeng <[email protected]>
AuthorDate: Tue Aug 13 09:28:17 2024 +0800
[fix](cloud tvf) Fix tvf query run in cloud multi cluster #37157 (#39249)
cherry pick from #37157
---
.../doris/alter/AlterLightSchChangeHelper.java | 5 +-
.../java/org/apache/doris/alter/SystemHandler.java | 10 +-
.../analysis/AdminCancelRebalanceDiskStmt.java | 5 +-
.../apache/doris/analysis/AdminCleanTrashStmt.java | 5 +-
.../doris/analysis/AdminRebalanceDiskStmt.java | 11 +-
.../apache/doris/analysis/ShowTrashDiskStmt.java | 4 +-
.../org/apache/doris/analysis/ShowTrashStmt.java | 4 +-
.../main/java/org/apache/doris/catalog/Env.java | 4 +-
.../org/apache/doris/catalog/StorageVaultMgr.java | 13 +-
.../org/apache/doris/catalog/TabletStatMgr.java | 9 +-
.../org/apache/doris/clone/TabletScheduler.java | 8 +-
.../doris/cloud/system/CloudSystemInfoService.java | 16 +--
.../apache/doris/common/proc/ReplicasProcNode.java | 5 +-
.../apache/doris/common/proc/TabletsProcDir.java | 9 +-
.../org/apache/doris/common/proc/TrashProcDir.java | 8 +-
.../common/publish/ClusterStatePublisher.java | 140 ---------------------
.../doris/common/publish/TopicPublisherThread.java | 8 +-
.../apache/doris/common/util/AutoBucketUtils.java | 17 ++-
.../doris/datasource/FederationBackendPolicy.java | 6 +-
.../apache/doris/datasource/FileQueryScanNode.java | 2 +-
.../doris/datasource/jdbc/JdbcExternalCatalog.java | 11 +-
.../doris/httpv2/rest/manager/NodeAction.java | 2 +-
.../doris/httpv2/restv2/StatisticAction.java | 17 ++-
.../org/apache/doris/load/GroupCommitManager.java | 10 +-
.../org/apache/doris/load/StreamLoadRecordMgr.java | 13 +-
.../doris/metric/SimpleCoreMetricVisitor.java | 15 ++-
.../BackendDistributedPlanWorkerManager.java | 2 +-
.../planner/BackendPartitionedSchemaScanNode.java | 8 +-
.../java/org/apache/doris/policy/PolicyMgr.java | 2 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 2 +-
.../apache/doris/qe/InsertStreamTxnExecutor.java | 2 +-
.../org/apache/doris/qe/QueryCancelWorker.java | 13 +-
.../java/org/apache/doris/qe/ShowExecutor.java | 2 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 5 +-
.../apache/doris/qe/cache/CacheCoordinator.java | 6 +-
.../apache/doris/resource/AdmissionControl.java | 10 +-
.../apache/doris/service/FrontendServiceImpl.java | 2 +-
.../java/org/apache/doris/system/HeartbeatMgr.java | 10 +-
.../org/apache/doris/system/SystemInfoService.java | 97 ++++++--------
.../tablefunction/DataGenTableValuedFunction.java | 4 +-
.../ExternalFileTableValuedFunction.java | 13 +-
.../tablefunction/NumbersTableValuedFunction.java | 2 +-
.../java/org/apache/doris/alter/AlterTest.java | 2 +-
.../analysis/AdminCancelRebalanceDiskStmtTest.java | 8 +-
.../org/apache/doris/analysis/CopyIntoTest.java | 2 +-
.../CreateTableElasticOnStorageMediumTest.java | 4 +-
.../doris/catalog/DynamicPartitionTableTest.java | 5 +-
.../apache/doris/catalog/ModifyBackendTest.java | 8 +-
.../doris/clone/AddReplicaChoseMediumTest.java | 2 +-
.../org/apache/doris/clone/BalanceStatistic.java | 5 +-
.../ColocateTableCheckerAndBalancerPerfTest.java | 2 +-
.../org/apache/doris/clone/DecommissionTest.java | 4 +-
.../doris/clone/DiskReblanceWhenSchedulerIdle.java | 2 +-
.../doris/clone/TabletRepairAndBalanceTest.java | 2 +-
.../doris/clone/TabletReplicaTooSlowTest.java | 2 +-
.../doris/cluster/DecommissionBackendTest.java | 32 ++---
.../doris/cluster/SystemInfoServiceTest.java | 2 +-
.../doris/common/util/AutoBucketUtilsTest.java | 7 +-
.../doris/load/sync/canal/CanalSyncDataTest.java | 2 +-
.../doris/planner/FederationBackendPolicyTest.java | 6 +-
.../apache/doris/planner/ResourceTagQueryTest.java | 2 +-
.../doris/utframe/DemoMultiBackendsTest.java | 6 +-
.../org/apache/doris/regression/suite/Suite.groovy | 1 -
.../suites/cloud_p0/multi_cluster/test_tvf.groovy | 86 +++++++++++++
64 files changed, 403 insertions(+), 326 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java
index 70a322a8c5f..29130934817 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java
@@ -27,6 +27,7 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
import org.apache.doris.persist.AlterLightSchemaChangeInfo;
import org.apache.doris.proto.InternalService.PFetchColIdsRequest;
import org.apache.doris.proto.InternalService.PFetchColIdsRequest.Builder;
@@ -156,14 +157,14 @@ public class AlterLightSchChangeHelper {
Map<Long, Future<PFetchColIdsResponse>> beIdToRespFuture = new
HashMap<>();
try {
for (Long beId : beIdToRequest.keySet()) {
- final Backend backend =
Env.getCurrentSystemInfo().getIdToBackend().get(beId);
+ final Backend backend =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().get(beId);
final TNetworkAddress address =
new
TNetworkAddress(Objects.requireNonNull(backend).getHost(),
backend.getBrpcPort());
final Future<PFetchColIdsResponse> responseFuture =
BackendServiceProxy.getInstance()
.getColumnIdsByTabletIds(address,
beIdToRequest.get(beId));
beIdToRespFuture.put(beId, responseFuture);
}
- } catch (RpcException e) {
+ } catch (RpcException | UserException e) {
throw new IllegalStateException("fetch columnIds RPC failed", e);
}
// wait for and get results
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
index 26842806483..e0909088f8d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
@@ -288,7 +288,15 @@ public class SystemHandler extends AlterHandler {
Set<Tag> decommissionTags = decommissionBackends.stream().map(be ->
be.getLocationTag())
.collect(Collectors.toSet());
Map<Tag, Integer> tagAvailBackendNums = Maps.newHashMap();
- for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) {
+ List<Backend> bes;
+ try {
+ bes =
Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values().asList();
+ } catch (UserException e) {
+ LOG.warn("Failed to get current cluster backend by current
cluster.", e);
+ return;
+ }
+
+ for (Backend backend : bes) {
long beId = backend.getId();
if (!backend.isScheduleAvailable()
|| decommissionBackends.stream().anyMatch(be -> be.getId()
== beId)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java
index 648e7ab47f1..874a1af1368 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
@@ -36,8 +37,8 @@ import java.util.Map;
public class AdminCancelRebalanceDiskStmt extends DdlStmt {
private List<Backend> backends = Lists.newArrayList();
- public AdminCancelRebalanceDiskStmt(List<String> backends) {
- ImmutableMap<Long, Backend> backendsInfo =
Env.getCurrentSystemInfo().getIdToBackend();
+ public AdminCancelRebalanceDiskStmt(List<String> backends) throws
UserException {
+ ImmutableMap<Long, Backend> backendsInfo =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
Map<String, Long> backendsID = new HashMap<String, Long>();
for (Backend backend : backendsInfo.values()) {
backendsID.put(NetUtils.getHostPortInAccessibleFormat(backend.getHost(),
backend.getHeartbeatPort()),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCleanTrashStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCleanTrashStmt.java
index 64f1cccf5b6..b6d5cc2ce19 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCleanTrashStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCleanTrashStmt.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
@@ -36,8 +37,8 @@ import java.util.Map;
public class AdminCleanTrashStmt extends DdlStmt {
private List<Backend> backends = Lists.newArrayList();
- public AdminCleanTrashStmt(List<String> backends) {
- ImmutableMap<Long, Backend> backendsInfo =
Env.getCurrentSystemInfo().getIdToBackend();
+ public AdminCleanTrashStmt(List<String> backends) throws UserException {
+ ImmutableMap<Long, Backend> backendsInfo =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
Map<String, Long> backendsID = new HashMap<String, Long>();
for (Backend backend : backendsInfo.values()) {
backendsID.put(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java
index 69f230f33b6..aa48a9a1fc1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java
@@ -28,17 +28,26 @@ import org.apache.doris.system.Backend;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class AdminRebalanceDiskStmt extends DdlStmt {
+ private static final Logger LOG =
LogManager.getLogger(AdminRebalanceDiskStmt.class);
private List<Backend> backends = Lists.newArrayList();
private long timeoutS = 0;
public AdminRebalanceDiskStmt(List<String> backends) {
- ImmutableMap<Long, Backend> backendsInfo =
Env.getCurrentSystemInfo().getIdToBackend();
+ ImmutableMap<Long, Backend> backendsInfo;
+ try {
+ backendsInfo =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
+ } catch (AnalysisException e) {
+ LOG.warn("failed to get backends,", e);
+ return;
+ }
Map<String, Long> backendsID = new HashMap<String, Long>();
for (Backend backend : backendsInfo.values()) {
backendsID.put(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashDiskStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashDiskStmt.java
index cdd3243dfc2..f5fad57d0f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashDiskStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashDiskStmt.java
@@ -36,8 +36,8 @@ public class ShowTrashDiskStmt extends ShowStmt {
private Backend backend;
- public ShowTrashDiskStmt(String backendQuery) {
- ImmutableMap<Long, Backend> backendsInfo =
Env.getCurrentSystemInfo().getIdToBackend();
+ public ShowTrashDiskStmt(String backendQuery) throws AnalysisException {
+ ImmutableMap<Long, Backend> backendsInfo =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
for (Backend backend : backendsInfo.values()) {
String backendStr =
NetUtils.getHostPortInAccessibleFormat(backend.getHost(),
backend.getHeartbeatPort());
if (backendQuery.equals(backendStr)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashStmt.java
index c69980504d9..3071a657c53 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashStmt.java
@@ -37,8 +37,8 @@ import java.util.List;
public class ShowTrashStmt extends ShowStmt {
private List<Backend> backends = Lists.newArrayList();
- public ShowTrashStmt() {
- ImmutableMap<Long, Backend> backendsInfo =
Env.getCurrentSystemInfo().getIdToBackend();
+ public ShowTrashStmt() throws AnalysisException {
+ ImmutableMap<Long, Backend> backendsInfo =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
for (Backend backend : backendsInfo.values()) {
this.backends.add(backend);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 32e9d16721e..85cac925a47 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -6168,8 +6168,8 @@ public class Env {
AgentTaskExecutor.submit(batchTask);
}
- public void cleanUDFCacheTask(DropFunctionStmt stmt) {
- ImmutableMap<Long, Backend> backendsInfo =
Env.getCurrentSystemInfo().getIdToBackend();
+ public void cleanUDFCacheTask(DropFunctionStmt stmt) throws UserException {
+ ImmutableMap<Long, Backend> backendsInfo =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
String functionSignature = stmt.signatureString();
AgentBatchTask batchTask = new AgentBatchTask();
for (Backend backend : backendsInfo.values()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java
index c9f254e8ed9..1b9426e4570 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java
@@ -24,10 +24,12 @@ import
org.apache.doris.cloud.proto.Cloud.AlterObjStoreInfoRequest.Operation;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.proto.InternalService.PAlterVaultSyncRequest;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TNetworkAddress;
@@ -35,6 +37,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReadWriteLock;
@@ -176,7 +179,15 @@ public class StorageVaultMgr {
}
private void alterSyncVaultTask() {
- systemInfoService.getAllBackends().forEach(backend -> {
+ List<Backend> bes;
+ try {
+ // get system all backends
+ bes =
systemInfoService.getAllBackendsByAllCluster().values().asList();
+ } catch (UserException e) {
+ LOG.warn("failed to get current cluster backends: {}", e);
+ return;
+ }
+ bes.forEach(backend -> {
TNetworkAddress address = backend.getBrpcAddress();
try {
BackendServiceProxy.getInstance().alterVaultSync(address,
PAlterVaultSyncRequest.newBuilder().build());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
index bf24d2bb390..030dc4d7a0e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
@@ -18,6 +18,7 @@
package org.apache.doris.catalog;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;
@@ -51,7 +52,13 @@ public class TabletStatMgr extends MasterDaemon {
@Override
protected void runAfterCatalogReady() {
- ImmutableMap<Long, Backend> backends =
Env.getCurrentSystemInfo().getIdToBackend();
+ ImmutableMap<Long, Backend> backends;
+ try {
+ backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
+ } catch (AnalysisException e) {
+ LOG.warn("can't get backends info", e);
+ return;
+ }
long start = System.currentTimeMillis();
taskPool.submit(() -> {
// no need to get tablet stat if backend is not alive
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 842e847bea0..a83308a650b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -191,7 +191,13 @@ public class TabletScheduler extends MasterDaemon {
* update working slots at the beginning of each round
*/
private boolean updateWorkingSlots() {
- ImmutableMap<Long, Backend> backends = infoService.getAllBackendsMap();
+ ImmutableMap<Long, Backend> backends;
+ try {
+ backends = infoService.getAllBackendsByAllCluster();
+ } catch (AnalysisException e) {
+ LOG.warn("failed to get backends with current cluster", e);
+ return false;
+ }
for (Backend backend : backends.values()) {
if (!backend.hasPathHash() && backend.isAlive()) {
// when upgrading, backend may not get path info yet. so
return false and wait for next round.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index bf6ed760874..48728efb003 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -24,6 +24,7 @@ import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.proto.Cloud.ClusterPB;
import org.apache.doris.cloud.proto.Cloud.InstanceInfoPB;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
@@ -368,25 +369,18 @@ public class CloudSystemInfoService extends
SystemInfoService {
}
@Override
- public List<Backend> getBackendsByCurrentCluster() throws UserException {
+ public ImmutableMap<Long, Backend> getBackendsByCurrentCluster() throws
AnalysisException {
ConnectContext ctx = ConnectContext.get();
if (ctx == null) {
- throw new UserException("connect context is null");
+ throw new AnalysisException("connect context is null");
}
String cluster = ctx.getCurrentCloudCluster();
if (Strings.isNullOrEmpty(cluster)) {
- throw new UserException("cluster name is empty");
+ throw new AnalysisException("cluster name is empty");
}
- //((CloudEnv) Env.getCurrentEnv()).checkCloudClusterPriv(cluster);
-
- return getBackendsByClusterName(cluster);
- }
-
- @Override
- public ImmutableMap<Long, Backend> getBackendsWithIdByCurrentCluster()
throws UserException {
- List<Backend> backends = getBackendsByCurrentCluster();
+ List<Backend> backends = getBackendsByClusterName(cluster);
Map<Long, Backend> idToBackend = Maps.newHashMap();
for (Backend be : backends) {
idToBackend.put(be.getId(), be);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
index edf2a9d3517..8f38b34a2b1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.TimeUtils;
@@ -57,8 +58,8 @@ public class ReplicasProcNode implements ProcNodeInterface {
}
@Override
- public ProcResult fetchResult() {
- ImmutableMap<Long, Backend> backendMap =
Env.getCurrentSystemInfo().getIdToBackend();
+ public ProcResult fetchResult() throws AnalysisException {
+ ImmutableMap<Long, Backend> backendMap =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
index 46c89eb3253..12c1adf71d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
@@ -65,10 +65,11 @@ public class TabletsProcDir implements ProcDirInterface {
this.index = index;
}
- public List<List<Comparable>> fetchComparableResult(long version, long
backendId, Replica.ReplicaState state) {
+ public List<List<Comparable>> fetchComparableResult(long version, long
backendId, Replica.ReplicaState state)
+ throws AnalysisException {
Preconditions.checkNotNull(table);
Preconditions.checkNotNull(index);
- ImmutableMap<Long, Backend> backendMap =
Env.getCurrentSystemInfo().getIdToBackend();
+ ImmutableMap<Long, Backend> backendMap =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
List<List<Comparable>> tabletInfos = new ArrayList<List<Comparable>>();
Map<Long, String> pathHashToRoot = new HashMap<>();
@@ -179,12 +180,12 @@ public class TabletsProcDir implements ProcDirInterface {
return tabletInfos;
}
- private List<List<Comparable>> fetchComparableResult() {
+ private List<List<Comparable>> fetchComparableResult() throws
AnalysisException {
return fetchComparableResult(-1, -1, null);
}
@Override
- public ProcResult fetchResult() {
+ public ProcResult fetchResult() throws AnalysisException {
List<List<Comparable>> tabletInfos = fetchComparableResult();
// sort by tabletId, replicaId
ListComparator<List<Comparable>> comparator = new
ListComparator<List<Comparable>>(0, 1);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TrashProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TrashProcDir.java
index 18605b37c0e..37e5e345181 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TrashProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TrashProcDir.java
@@ -50,7 +50,13 @@ public class TrashProcDir implements ProcDirInterface {
private List<Backend> backends = Lists.newArrayList();
public TrashProcDir() {
- ImmutableMap<Long, Backend> backendsInfo =
Env.getCurrentSystemInfo().getIdToBackend();
+ ImmutableMap<Long, Backend> backendsInfo;
+ try {
+ backendsInfo =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
+ } catch (AnalysisException e) {
+ LOG.warn("Can't get backends info", e);
+ return;
+ }
for (Backend backend : backendsInfo.values()) {
this.backends.add(backend);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/ClusterStatePublisher.java
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/ClusterStatePublisher.java
deleted file mode 100644
index 70a3721d822..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/ClusterStatePublisher.java
+++ /dev/null
@@ -1,140 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.publish;
-
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.ClientPool;
-import org.apache.doris.common.ThreadPoolManager;
-import org.apache.doris.system.Backend;
-import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.thrift.BackendService;
-import org.apache.doris.thrift.TAgentPublishRequest;
-import org.apache.doris.thrift.TAgentResult;
-import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TStatusCode;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.thrift.TException;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.ExecutorService;
-
-// This class intend to publish the state of cluster to backends.
-public class ClusterStatePublisher {
- private static final Logger LOG =
LogManager.getLogger(ClusterStatePublisher.class);
- private static volatile ClusterStatePublisher INSTANCE;
-
- private ExecutorService executor = ThreadPoolManager
- .newDaemonFixedThreadPool(5, 256, "cluster-state-publisher", true);
-
- private SystemInfoService clusterInfoService;
-
- // Use public for unit test easily.
- public ClusterStatePublisher(SystemInfoService clusterInfoService) {
- this.clusterInfoService = clusterInfoService;
- }
-
- public static ClusterStatePublisher getInstance() {
- if (INSTANCE == null) {
- synchronized (ClusterStatePublisher.class) {
- if (INSTANCE == null) {
- INSTANCE = new
ClusterStatePublisher(Env.getCurrentSystemInfo());
- }
- }
- }
- return INSTANCE;
- }
-
- public void publish(ClusterStateUpdate state, Listener listener, int
timeoutMs) {
- Collection<Backend> nodesToPublish =
clusterInfoService.getIdToBackend().values();
- AckResponseHandler handler = new AckResponseHandler(nodesToPublish,
listener);
- for (Backend node : nodesToPublish) {
- executor.submit(new PublishWorker(state, node, handler));
- }
- try {
- if (!handler.awaitAllInMs(timeoutMs)) {
- Backend[] backends = handler.pendingNodes();
- if (backends.length > 0) {
- LOG.warn("timed out waiting for all nodes to publish.
(pending nodes: {})",
- Arrays.toString(backends));
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- public class PublishWorker implements Runnable {
- private ClusterStateUpdate stateUpdate;
- private Backend node;
- private ResponseHandler handler;
-
- public PublishWorker(ClusterStateUpdate stateUpdate, Backend node,
ResponseHandler handler) {
- this.stateUpdate = stateUpdate;
- this.node = node;
- this.handler = handler;
- }
-
- @Override
- public void run() {
- // Here to publish all worker
- TNetworkAddress addr = new TNetworkAddress(node.getHost(),
node.getBePort());
- BackendService.Client client = null;
- try {
- client = ClientPool.backendPool.borrowObject(addr);
- } catch (Exception e) {
- LOG.warn("Fetch a agent client failed. backend=[{}]
reason=[{}]", addr, e);
- handler.onFailure(node, e);
- return;
- }
- try {
- TAgentPublishRequest request = stateUpdate.toThrift();
- TAgentResult tAgentResult = null;
- try {
- tAgentResult = client.publishClusterState(request);
- } catch (TException e) {
- // Ok, lets try another time
- if (!ClientPool.backendPool.reopen(client)) {
- // Failed another time, throw this
- throw e;
- }
- tAgentResult = client.publishClusterState(request);
- }
- if (tAgentResult.getStatus().getStatusCode() !=
TStatusCode.OK) {
- // Success execute, no dirty data possibility
- LOG.warn("Backend execute publish failed. backend=[{}],
message=[{}]",
- addr, tAgentResult.getStatus().getErrorMsgs());
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Success publish to backend([{}])", addr);
- }
- // Publish here
- handler.onResponse(node);
- } catch (TException e) {
- LOG.warn("A thrift exception happened when publish to a
backend. backend=[{}], reason=[{}]", addr, e);
- handler.onFailure(node, e);
- ClientPool.backendPool.invalidateObject(addr, client);
- client = null;
- } finally {
- ClientPool.backendPool.returnObject(addr, client);
- }
- }
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
index 2407e3a2516..74cefeca4d9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
@@ -76,7 +76,13 @@ public class TopicPublisherThread extends MasterDaemon {
// because it may means workload group/policy is dropped
// step 2: publish topic info to all be
- Collection<Backend> nodesToPublish =
clusterInfoService.getIdToBackend().values();
+ Collection<Backend> nodesToPublish;
+ try {
+ nodesToPublish =
clusterInfoService.getAllBackendsByAllCluster().values();
+ } catch (Exception e) {
+ LOG.warn("get backends failed", e);
+ return;
+ }
AckResponseHandler handler = new AckResponseHandler(nodesToPublish);
for (Backend be : nodesToPublish) {
executor.submit(new TopicPublishWorker(request, be, handler));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java
index f9291c4cea8..19c4c4bf369 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java
@@ -20,6 +20,7 @@ package org.apache.doris.common.util;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.DiskInfo.DiskState;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
@@ -37,7 +38,13 @@ public class AutoBucketUtils {
private static int getBENum() {
SystemInfoService infoService = Env.getCurrentSystemInfo();
- ImmutableMap<Long, Backend> backends = infoService.getAllBackendsMap();
+ ImmutableMap<Long, Backend> backends;
+ try {
+ backends = infoService.getAllBackendsByAllCluster();
+ } catch (AnalysisException e) {
+ logger.warn("failed to get backends with current cluster", e);
+ return 0;
+ }
int activeBENum = 0;
for (Backend backend : backends.values()) {
@@ -50,7 +57,13 @@ public class AutoBucketUtils {
private static int getBucketsNumByBEDisks() {
SystemInfoService infoService = Env.getCurrentSystemInfo();
- ImmutableMap<Long, Backend> backends = infoService.getAllBackendsMap();
+ ImmutableMap<Long, Backend> backends;
+ try {
+ backends = infoService.getAllBackendsByAllCluster();
+ } catch (AnalysisException e) {
+ logger.warn("failed to get backends with current cluster", e);
+ return 0;
+ }
int buckets = 0;
for (Backend backend : backends.values()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
index 7938fba4d28..a2b902fd744 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
@@ -184,9 +184,11 @@ public class FederationBackendPolicy {
}
public void init(BeSelectionPolicy policy) throws UserException {
-
backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo().getBackendsByCurrentCluster()));
+ backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo()
+ .getBackendsByCurrentCluster().values().asList()));
if (backends.isEmpty()) {
- throw new UserException("No available backends");
+ throw new UserException("No available backends, "
+ + "in cloud maybe this cluster has been dropped, please `use
@otherClusterName` switch it");
}
for (Backend backend : backends) {
assignedWeightPerBackend.put(backend, 0L);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index df3fbca56d4..97fff0cf1a2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -284,7 +284,7 @@ public abstract class FileQueryScanNode extends
FileScanNode {
TScanRangeLocation location = new TScanRangeLocation();
long backendId = ConnectContext.get().getBackendId();
- Backend backend =
Env.getCurrentSystemInfo().getIdToBackend().get(backendId);
+ Backend backend =
Env.getCurrentSystemInfo().getBackendsByCurrentCluster().get(backendId);
location.setBackendId(backendId);
location.setServer(new TNetworkAddress(backend.getHost(),
backend.getBePort()));
curLocations.addToLocations(location);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
index 73b6639c7b9..afa341fa879 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.CatalogProperty;
@@ -347,10 +348,14 @@ public class JdbcExternalCatalog extends ExternalCatalog {
private void testBeToJdbcConnection() throws DdlException {
Backend aliveBe = null;
- for (Backend be :
Env.getCurrentSystemInfo().getIdToBackend().values()) {
- if (be.isAlive()) {
- aliveBe = be;
+ try {
+ for (Backend be :
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values()) {
+ if (be.isAlive()) {
+ aliveBe = be;
+ }
}
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
}
if (aliveBe == null) {
throw new DdlException("Test BE Connection to JDBC Failed: No
Alive backends");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
index 15d8e51e2c7..814b00b49ac 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
@@ -627,7 +627,7 @@ public class NodeAction extends RestBaseController {
} else if ("DROP".equals(action)) {
currentSystemInfo.dropBackends(hostInfos);
} else if ("DECOMMISSION".equals(action)) {
- ImmutableMap<Long, Backend> backendsInCluster =
currentSystemInfo.getAllBackendsMap();
+ ImmutableMap<Long, Backend> backendsInCluster =
currentSystemInfo.getAllBackendsByAllCluster();
backendsInCluster.forEach((k, v) -> {
hostInfos.stream()
.filter(h -> v.getHost().equals(h.getHost()) &&
v.getHeartbeatPort() == h.getPort())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/StatisticAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/StatisticAction.java
index 1c0c2d8f817..dcc847275c8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/StatisticAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/StatisticAction.java
@@ -19,6 +19,7 @@ package org.apache.doris.httpv2.restv2;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
+import org.apache.doris.common.UserException;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.rest.RestBaseController;
@@ -80,7 +81,13 @@ public class StatisticAction extends RestBaseController {
private long getDiskOccupancy(SystemInfoService infoService) {
long diskOccupancy = 0;
- List<Backend> backends = infoService.getAllBackends();
+ List<Backend> backends;
+ try {
+ backends =
infoService.getAllBackendsByAllCluster().values().asList();
+ } catch (UserException e) {
+ LOG.warn("failed to get backends by current cluster", e);
+ return 0;
+ }
for (Backend be : backends) {
diskOccupancy += be.getDataUsedCapacityB();
}
@@ -89,7 +96,13 @@ public class StatisticAction extends RestBaseController {
private long getRemainDisk(SystemInfoService infoService) {
long remainDisk = 0;
- List<Backend> backends = infoService.getAllBackends();
+ List<Backend> backends;
+ try {
+ backends =
infoService.getAllBackendsByAllCluster().values().asList();
+ } catch (UserException e) {
+ LOG.warn("failed to get backends by current cluster", e);
+ return 0;
+ }
for (Backend be : backends) {
remainDisk += be.getAvailableCapacityB();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
index 51f4ef8e6c0..1009c4257b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
@@ -20,6 +20,7 @@ package org.apache.doris.load;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
@@ -279,7 +280,14 @@ public class GroupCommitManager {
return cachedBackendId;
}
- List<Backend> backends = new
ArrayList<>((Env.getCurrentSystemInfo()).getAllBackends());
+ List<Backend> backends = new ArrayList<>();
+ try {
+ backends = new
ArrayList<>(Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList());
+ } catch (AnalysisException e) {
+ LOG.warn("failed to get backends by all cluster", e);
+ throw new LoadException(e.getMessage());
+ }
+
if (backends.isEmpty()) {
throw new LoadException("No alive backend");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
index 6c53f354af8..1e1dc192cee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
@@ -20,6 +20,7 @@ package org.apache.doris.load;
import org.apache.doris.analysis.ShowStreamLoadStmt.StreamLoadState;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
@@ -243,7 +244,13 @@ public class StreamLoadRecordMgr extends MasterDaemon {
@Override
protected void runAfterCatalogReady() {
- ImmutableMap<Long, Backend> backends =
Env.getCurrentSystemInfo().getIdToBackend();
+ ImmutableMap<Long, Backend> backends;
+ try {
+ backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
+ } catch (AnalysisException e) {
+ LOG.warn("Failed to load backends from system info", e);
+ return;
+ }
long start = System.currentTimeMillis();
int pullRecordSize = 0;
Map<Long, Long> beIdToLastStreamLoad = Maps.newHashMap();
@@ -354,8 +361,8 @@ public class StreamLoadRecordMgr extends MasterDaemon {
}
}
- public void replayFetchStreamLoadRecord(FetchStreamLoadRecord
fetchStreamLoadRecord) {
- ImmutableMap<Long, Backend> backends =
Env.getCurrentSystemInfo().getIdToBackend();
+ public void replayFetchStreamLoadRecord(FetchStreamLoadRecord
fetchStreamLoadRecord) throws AnalysisException {
+ ImmutableMap<Long, Backend> backends =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
Map<Long, Long> beIdToLastStreamLoad =
fetchStreamLoadRecord.getBeIdToLastStreamLoad();
for (Backend backend : backends.values()) {
if (beIdToLastStreamLoad.containsKey(backend.getId())) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java
index 162828d37cd..07cd8353df6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java
@@ -18,6 +18,7 @@
package org.apache.doris.metric;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.monitor.jvm.JvmStats;
import org.apache.doris.monitor.jvm.JvmStats.MemoryPool;
import org.apache.doris.monitor.jvm.JvmStats.Threads;
@@ -26,6 +27,8 @@ import com.codahale.metrics.Histogram;
import com.codahale.metrics.Snapshot;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.Iterator;
import java.util.Map;
@@ -38,7 +41,7 @@ import java.util.Map;
* query_latency_ms_75 LONG 2
*/
public class SimpleCoreMetricVisitor extends MetricVisitor {
-
+ private static final Logger LOG =
LogManager.getLogger(SimpleCoreMetricVisitor.class);
private static final String TYPE_LONG = "LONG";
private static final String TYPE_DOUBLE = "DOUBLE";
@@ -128,8 +131,14 @@ public class SimpleCoreMetricVisitor extends MetricVisitor
{
@Override
public void visitNodeInfo() {
long feDeadNum =
Env.getCurrentEnv().getFrontends(null).stream().filter(f ->
!f.isAlive()).count();
- long beDeadNum =
Env.getCurrentSystemInfo().getIdToBackend().values().stream().filter(b ->
!b.isAlive())
- .count();
+ long beDeadNum = 0;
+ try {
+ beDeadNum = Env.getCurrentSystemInfo().getAllBackendsByAllCluster()
+ .values().stream().filter(b -> !b.isAlive())
+ .count();
+ } catch (AnalysisException e) {
+ LOG.warn("failed get backend, ", e);
+ }
long brokerDeadNum =
Env.getCurrentEnv().getBrokerMgr().getAllBrokers().stream().filter(b ->
!b.isAlive)
.count();
sb.append("doris_fe_frontend_dead_num").append("
").append(feDeadNum).append("\n");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java
index 190d6d898a9..7acbe653e98 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java
@@ -32,7 +32,7 @@ import java.util.function.Supplier;
public class BackendDistributedPlanWorkerManager implements
DistributedPlanWorkerManager {
private final Supplier<ImmutableMap<Long, Backend>> backends =
Suppliers.memoize(() -> {
try {
- return
Env.getCurrentSystemInfo().getBackendsWithIdByCurrentCluster();
+ return Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
} catch (Exception t) {
throw new NereidsException("Can not get backends: " + t, t);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
index 803afe05d8d..ab2798e2ba7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
@@ -120,7 +120,7 @@ public class BackendPartitionedSchemaScanNode extends
SchemaScanNode {
scanRangeLocations = new ArrayList<>();
for (Long partitionID : selectedPartitionIds) {
Long backendId = partitionIDToBackendID.get(partitionID);
- Backend be =
Env.getCurrentSystemInfo().getIdToBackend().get(backendId);
+ Backend be =
Env.getCurrentSystemInfo().getBackendsByCurrentCluster().get(backendId);
if (!be.isAlive()) {
throw new AnalysisException("backend " + be.getId() + " is not
alive.");
}
@@ -134,7 +134,7 @@ public class BackendPartitionedSchemaScanNode extends
SchemaScanNode {
}
}
- private void computePartitionInfo() throws AnalysisException {
+ private void computePartitionInfo() throws UserException {
List<Column> partitionColumns = new ArrayList<>();
for (SlotDescriptor slotDesc : desc.getSlots()) {
if
(BEACKEND_ID_COLUMN_SET.contains(slotDesc.getColumn().getName().toLowerCase()))
{
@@ -155,11 +155,11 @@ public class BackendPartitionedSchemaScanNode extends
SchemaScanNode {
* @param partitionColumns The Columns we want to create partitionInfo
* @throws AnalysisException
*/
- private void createPartitionInfo(List<Column> partitionColumns) throws
AnalysisException {
+ private void createPartitionInfo(List<Column> partitionColumns) throws
UserException {
backendPartitionInfo = new PartitionInfo(PartitionType.LIST,
partitionColumns);
partitionIDToBackendID = new HashMap<>();
long partitionID = 0;
- for (Backend be :
Env.getCurrentSystemInfo().getIdToBackend().values()) {
+ for (Backend be :
Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values()) {
if (be.isAlive()) {
// create partition key
PartitionKey partitionKey = new PartitionKey();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
index 361a84f9fe8..6e8bd4f08cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
@@ -731,7 +731,7 @@ public class PolicyMgr implements Writable {
// log alter
Env.getCurrentEnv().getEditLog().logAlterStoragePolicy(storagePolicy);
AgentBatchTask batchTask = new AgentBatchTask();
- for (long backendId :
Env.getCurrentSystemInfo().getIdToBackend().keySet()) {
+ for (long backendId :
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().keySet()) {
PushStoragePolicyTask pushStoragePolicyTask = new
PushStoragePolicyTask(backendId,
Collections.singletonList(storagePolicy),
Collections.emptyList(), Collections.emptyList());
batchTask.addTask(pushStoragePolicyTask);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index b07e2d7304c..4ee46d3bec3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -565,7 +565,7 @@ public class Coordinator implements CoordInterface {
currentConnectFE = coordAddress;
}
- this.idToBackend =
Env.getCurrentSystemInfo().getBackendsWithIdByCurrentCluster();
+ this.idToBackend =
Env.getCurrentSystemInfo().getBackendsByCurrentCluster();
if (LOG.isDebugEnabled()) {
int backendNum = idToBackend.size();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
index e25323440bb..3cba759abc5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -110,7 +110,7 @@ public class InsertStreamTxnExecutor {
throw new UserException("No available backend to match the policy:
" + policy);
}
- Backend backend =
Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0));
+ Backend backend =
Env.getCurrentSystemInfo().getBackendsByCurrentCluster().get(beIds.get(0));
txnConf.setUserIp(backend.getHost());
txnEntry.setBackend(backend);
TNetworkAddress address = new TNetworkAddress(backend.getHost(),
backend.getBrpcPort());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java
index 7bb02b0c6c8..1edde03203e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java
@@ -22,9 +22,14 @@ import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.util.List;
+
public class QueryCancelWorker extends MasterDaemon {
+ private static final Logger LOG =
LogManager.getLogger(QueryCancelWorker.class);
private SystemInfoService systemInfoService;
public QueryCancelWorker(SystemInfoService systemInfoService) {
@@ -33,7 +38,13 @@ public class QueryCancelWorker extends MasterDaemon {
@Override
protected void runAfterCatalogReady() {
- List<Backend> allBackends = systemInfoService.getAllBackends();
+ List<Backend> allBackends;
+ try {
+ allBackends =
systemInfoService.getAllBackendsByAllCluster().values().asList();
+ } catch (Exception e) {
+ LOG.warn("failed to get backends by current cluster", e);
+ return;
+ }
for (Coordinator co : QeProcessorImpl.INSTANCE.getAllCoordinators()) {
Status status = co.shouldCancel(allBackends);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 57fe06a8a23..a3d1ca313ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -2831,7 +2831,7 @@ public class ShowExecutor {
private void handleAdminShowTabletStorageFormat() throws AnalysisException
{
List<List<String>> resultRowSet = Lists.newArrayList();
- for (Backend be :
Env.getCurrentSystemInfo().getIdToBackend().values()) {
+ for (Backend be :
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values()) {
if (be.isQueryAvailable() && be.isLoadAvailable()) {
AgentClient client = new AgentClient(be.getHost(),
be.getBePort());
TCheckStorageFormatResult result = client.checkStorageFormat();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index e4262d4bea2..e0ae5763abb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1638,7 +1638,8 @@ public class StmtExecutor {
throw new UserException(e.getMessage());
}
LOG.info("kill query {}", queryId);
- Collection<Backend> nodesToPublish =
Env.getCurrentSystemInfo().getIdToBackend().values();
+ Collection<Backend> nodesToPublish = Env.getCurrentSystemInfo()
+ .getAllBackendsByAllCluster().values();
for (Backend be : nodesToPublish) {
if (be.isAlive()) {
try {
@@ -2071,7 +2072,7 @@ public class StmtExecutor {
// 4. get BE
TNetworkAddress address = null;
- for (Backend be :
Env.getCurrentSystemInfo().getIdToBackend().values()) {
+ for (Backend be :
Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values()) {
if (be.isAlive()) {
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
break;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java
index 529454b9fa3..11fc547e6b4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java
@@ -18,6 +18,7 @@
package org.apache.doris.qe.cache;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.UserException;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.SimpleScheduler;
import org.apache.doris.system.Backend;
@@ -110,7 +111,7 @@ public class CacheCoordinator {
}
try {
belock.lock();
- ImmutableMap<Long, Backend> idToBackend =
Env.getCurrentSystemInfo().getIdToBackend();
+ ImmutableMap<Long, Backend> idToBackend =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
if (idToBackend != null) {
if (!debugModel) {
clearBackend(idToBackend);
@@ -120,6 +121,9 @@ public class CacheCoordinator {
}
}
this.lastRefreshTime = System.currentTimeMillis();
+ } catch (UserException e) {
+ LOG.warn("cant get backend", e);
+ throw new RuntimeException(e);
} finally {
belock.unlock();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java
index ad4e9f94c05..1b9e8913c7e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java
@@ -17,6 +17,7 @@
package org.apache.doris.resource;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.MasterDaemon;
@@ -31,7 +32,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -70,7 +70,13 @@ public class AdmissionControl extends MasterDaemon {
this.isAllBeMemoryEnough = true;
return;
}
- Collection<Backend> backends =
clusterInfoService.getIdToBackend().values();
+ List<Backend> backends;
+ try {
+ backends =
clusterInfoService.getAllBackendsByAllCluster().values().asList();
+ } catch (AnalysisException e) {
+ LOG.warn("get backends failed", e);
+ throw new RuntimeException(e);
+ }
this.currentMemoryLimit = Config.query_queue_by_be_used_memory;
boolean tmpIsAllBeMemoryEnough = true;
List<Future<InternalService.PGetBeResourceResponse>> futureList = new
ArrayList();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index e96c0ac1e19..88925770640 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -3889,7 +3889,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
result.setStatus(new TStatus(TStatusCode.OK));
final SystemInfoService systemInfoService =
Env.getCurrentSystemInfo();
- List<Backend> backends = systemInfoService.getAllBackends();
+ List<Backend> backends =
systemInfoService.getAllBackendsByAllCluster().values().asList();
for (Backend backend : backends) {
TBackend tBackend = new TBackend();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 6a8008d6cbc..515db76b096 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.MasterDaemon;
@@ -108,7 +109,14 @@ public class HeartbeatMgr extends MasterDaemon {
List<TFrontendInfo> feInfos = Env.getCurrentEnv().getFrontendInfos();
List<Future<HeartbeatResponse>> hbResponses = Lists.newArrayList();
// send backend heartbeat
- for (Backend backend : nodeMgr.getIdToBackend().values()) {
+ List<Backend> bes;
+ try {
+ bes = nodeMgr.getAllBackendsByAllCluster().values().asList();
+ } catch (UserException e) {
+ LOG.warn("can not get backends", e);
+ return;
+ }
+ for (Backend backend : bes) {
BackendHeartbeatHandler handler = new
BackendHeartbeatHandler(backend, feInfos);
hbResponses.add(executor.submit(handler));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index a9b48888e3f..836d516c942 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -199,7 +199,7 @@ public class SystemInfoService {
// for test
public void addBackend(Backend backend) {
- Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef);
+ Map<Long, Backend> copiedBackends =
Maps.newHashMap(getAllClusterBackendsNoException());
copiedBackends.put(backend.getId(), backend);
ImmutableMap<Long, Backend> newIdToBackend =
ImmutableMap.copyOf(copiedBackends);
idToBackendRef = newIdToBackend;
@@ -209,7 +209,7 @@ public class SystemInfoService {
private void addBackend(String host, int heartbeatPort, Map<String,
String> tagMap) {
Backend newBackend = new Backend(Env.getCurrentEnv().getNextId(),
host, heartbeatPort);
// update idToBackend
- Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef);
+ Map<Long, Backend> copiedBackends =
Maps.newHashMap(getAllClusterBackendsNoException());
copiedBackends.put(newBackend.getId(), newBackend);
ImmutableMap<Long, Backend> newIdToBackend =
ImmutableMap.copyOf(copiedBackends);
idToBackendRef = newIdToBackend;
@@ -271,7 +271,7 @@ public class SystemInfoService {
.getHostPortInAccessibleFormat(host, heartbeatPort) + "]");
}
// update idToBackend
- Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef);
+ Map<Long, Backend> copiedBackends =
Maps.newHashMap(getAllClusterBackendsNoException());
copiedBackends.remove(droppedBackend.getId());
ImmutableMap<Long, Backend> newIdToBackend =
ImmutableMap.copyOf(copiedBackends);
idToBackendRef = newIdToBackend;
@@ -299,27 +299,11 @@ public class SystemInfoService {
}
public Backend getBackend(long backendId) {
- return idToBackendRef.get(backendId);
- }
-
- public boolean checkBackendLoadAvailable(long backendId) {
- Backend backend = idToBackendRef.get(backendId);
- if (backend == null || !backend.isLoadAvailable()) {
- return false;
- }
- return true;
- }
-
- public boolean checkBackendQueryAvailable(long backendId) {
- Backend backend = idToBackendRef.get(backendId);
- if (backend == null || !backend.isQueryAvailable()) {
- return false;
- }
- return true;
+ return getAllClusterBackendsNoException().get(backendId);
}
public boolean checkBackendScheduleAvailable(long backendId) {
- Backend backend = idToBackendRef.get(backendId);
+ Backend backend = getAllClusterBackendsNoException().get(backendId);
if (backend == null || !backend.isScheduleAvailable()) {
return false;
}
@@ -327,7 +311,7 @@ public class SystemInfoService {
}
public boolean checkBackendAlive(long backendId) {
- Backend backend = idToBackendRef.get(backendId);
+ Backend backend = getAllClusterBackendsNoException().get(backendId);
if (backend == null || !backend.isAlive()) {
return false;
}
@@ -335,7 +319,7 @@ public class SystemInfoService {
}
public Backend getBackendWithHeartbeatPort(String host, int heartPort) {
- ImmutableMap<Long, Backend> idToBackend = idToBackendRef;
+ ImmutableMap<Long, Backend> idToBackend =
getAllClusterBackendsNoException();
for (Backend backend : idToBackend.values()) {
if (backend.getHost().equals(host) && backend.getHeartbeatPort()
== heartPort) {
return backend;
@@ -345,7 +329,7 @@ public class SystemInfoService {
}
public Backend getBackendWithBePort(String ip, int bePort) {
- ImmutableMap<Long, Backend> idToBackend = idToBackendRef;
+ ImmutableMap<Long, Backend> idToBackend =
getAllClusterBackendsNoException();
for (Backend backend : idToBackend.values()) {
if (backend.getHost().equals(ip) && backend.getBePort() == bePort)
{
return backend;
@@ -355,7 +339,7 @@ public class SystemInfoService {
}
public Backend getBackendWithHttpPort(String ip, int httpPort) {
- ImmutableMap<Long, Backend> idToBackend = idToBackendRef;
+ ImmutableMap<Long, Backend> idToBackend =
getAllClusterBackendsNoException();
for (Backend backend : idToBackend.values()) {
if (backend.getHost().equals(ip) && backend.getHttpPort() ==
httpPort) {
return backend;
@@ -377,7 +361,7 @@ public class SystemInfoService {
}
public List<Long> getAllBackendIds(boolean needAlive) {
- ImmutableMap<Long, Backend> idToBackend = idToBackendRef;
+ ImmutableMap<Long, Backend> idToBackend =
getAllClusterBackendsNoException();
List<Long> backendIds = Lists.newArrayList(idToBackend.keySet());
if (!needAlive) {
return backendIds;
@@ -394,7 +378,7 @@ public class SystemInfoService {
}
public List<Long> getDecommissionedBackendIds() {
- ImmutableMap<Long, Backend> idToBackend = idToBackendRef;
+ ImmutableMap<Long, Backend> idToBackend =
getAllClusterBackendsNoException();
List<Long> backendIds = Lists.newArrayList(idToBackend.keySet());
Iterator<Long> iter = backendIds.iterator();
@@ -407,22 +391,20 @@ public class SystemInfoService {
return backendIds;
}
- public List<Backend> getAllBackends() {
- return Lists.newArrayList(idToBackendRef.values());
- }
-
public List<Backend> getMixBackends() {
- return idToBackendRef.values().stream().filter(backend ->
backend.isMixNode()).collect(Collectors.toList());
+ return getAllClusterBackendsNoException().values()
+ .stream().filter(backend ->
backend.isMixNode()).collect(Collectors.toList());
}
public List<Backend> getCnBackends() {
- return idToBackendRef.values().stream().filter(backend ->
backend.isComputeNode()).collect(Collectors.toList());
+ return getAllClusterBackendsNoException()
+
.values().stream().filter(Backend::isComputeNode).collect(Collectors.toList());
}
// return num of backends that from different hosts
public int getStorageBackendNumFromDiffHosts(boolean aliveOnly) {
Set<String> hosts = Sets.newHashSet();
- ImmutableMap<Long, Backend> idToBackend = idToBackendRef;
+ ImmutableMap<Long, Backend> idToBackend =
getAllClusterBackendsNoException();
for (Backend backend : idToBackend.values()) {
if ((aliveOnly && !backend.isAlive()) || backend.isComputeNode()) {
continue;
@@ -493,7 +475,7 @@ public class SystemInfoService {
TStorageMedium storageMedium, boolean isStorageMediumSpecified,
boolean isOnlyForCheck)
throws DdlException {
- Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef);
+ Map<Long, Backend> copiedBackends =
Maps.newHashMap(getAllClusterBackendsNoException());
Map<Tag, List<Long>> chosenBackendIds = Maps.newHashMap();
Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
short totalReplicaNum = 0;
@@ -572,7 +554,7 @@ public class SystemInfoService {
*/
public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int
number) {
Preconditions.checkArgument(number >= -1);
- List<Backend> candidates =
policy.getCandidateBackends(idToBackendRef.values());
+ List<Backend> candidates =
policy.getCandidateBackends(getAllClusterBackendsNoException().values());
if (candidates.size() < number || candidates.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not match policy: {}. candidates num: {}, expected:
{}", policy, candidates.size(), number);
@@ -651,14 +633,6 @@ public class SystemInfoService {
}
}
- public ImmutableMap<Long, Backend> getIdToBackend() {
- return idToBackendRef;
- }
-
- public ImmutableMap<Long, Backend> getAllBackendsMap() {
- return idToBackendRef;
- }
-
public long getBackendReportVersion(long backendId) {
AtomicLong atomicLong = null;
if ((atomicLong = idToReportVersionRef.get(backendId)) == null) {
@@ -685,7 +659,7 @@ public class SystemInfoService {
}
public long saveBackends(CountingDataOutputStream dos, long checksum)
throws IOException {
- ImmutableMap<Long, Backend> idToBackend = idToBackendRef;
+ ImmutableMap<Long, Backend> idToBackend =
getAllClusterBackendsNoException();
int backendCount = idToBackend.size();
checksum ^= backendCount;
dos.writeInt(backendCount);
@@ -751,7 +725,7 @@ public class SystemInfoService {
public void replayAddBackend(Backend newBackend) {
// update idToBackend
- Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef);
+ Map<Long, Backend> copiedBackends =
Maps.newHashMap(getAllClusterBackendsNoException());
copiedBackends.put(newBackend.getId(), newBackend);
ImmutableMap<Long, Backend> newIdToBackend =
ImmutableMap.copyOf(copiedBackends);
idToBackendRef = newIdToBackend;
@@ -768,7 +742,7 @@ public class SystemInfoService {
LOG.debug("replayDropBackend: {}", backend);
}
// update idToBackend
- Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef);
+ Map<Long, Backend> copiedBackends =
Maps.newHashMap(getAllClusterBackendsNoException());
copiedBackends.remove(backend.getId());
ImmutableMap<Long, Backend> newIdToBackend =
ImmutableMap.copyOf(copiedBackends);
idToBackendRef = newIdToBackend;
@@ -805,7 +779,7 @@ public class SystemInfoService {
private long getAvailableCapacityB() {
long capacity = 0L;
- for (Backend backend : idToBackendRef.values()) {
+ for (Backend backend : getAllClusterBackendsNoException().values()) {
// Here we do not check if backend is alive,
// We suppose the dead backends will back to alive later.
if (backend.isDecommissioned()) {
@@ -825,12 +799,21 @@ public class SystemInfoService {
}
}
+ private ImmutableMap<Long, Backend> getAllClusterBackendsNoException() {
+ try {
+ return getAllBackendsByAllCluster();
+ } catch (AnalysisException e) {
+ LOG.warn("getAllClusterBackendsNoException: ", e);
+ return ImmutableMap.of();
+ }
+ }
+
/*
* Try to randomly get a backend id by given host.
* If not found, return -1
*/
public long getBackendIdByHost(String host) {
- ImmutableMap<Long, Backend> idToBackend = idToBackendRef;
+ ImmutableMap<Long, Backend> idToBackend =
getAllClusterBackendsNoException();
List<Backend> selectedBackends = Lists.newArrayList();
for (Backend backend : idToBackend.values()) {
if (backend.getHost().equals(host)) {
@@ -995,19 +978,19 @@ public class SystemInfoService {
}
// CloudSystemInfoService override
- public List<Backend> getBackendsByCurrentCluster() throws UserException {
- return idToBackendRef.values().stream().collect(Collectors.toList());
+ public ImmutableMap<Long, Backend> getBackendsByCurrentCluster() throws
AnalysisException {
+ return idToBackendRef;
}
- // CloudSystemInfoService override
- public ImmutableMap<Long, Backend> getBackendsWithIdByCurrentCluster()
throws UserException {
- return getIdToBackend();
+ // Cloud and NonCloud get all bes
+ public ImmutableMap<Long, Backend> getAllBackendsByAllCluster() throws
AnalysisException {
+ return idToBackendRef;
}
public int getMinPipelineExecutorSize() {
List<Backend> currentBackends = null;
try {
- currentBackends = getBackendsByCurrentCluster();
+ currentBackends = getAllBackendsByAllCluster().values().asList();
} catch (UserException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("get current cluster backends failed: ", e);
@@ -1026,8 +1009,4 @@ public class SystemInfoService {
}
return minPipelineExecutorSize;
}
-
- public long aliveBECount() {
- return
idToBackendRef.values().stream().filter(Backend::isAlive).count();
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
index fc2a6d6dd3e..629b410e676 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
@@ -18,7 +18,7 @@
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
import org.apache.doris.planner.DataGenScanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
@@ -27,7 +27,7 @@ import org.apache.doris.thrift.TDataGenFunctionName;
import java.util.List;
public abstract class DataGenTableValuedFunction extends TableValuedFunctionIf
{
- public abstract List<TableValuedFunctionTask> getTasks() throws
AnalysisException;
+ public abstract List<TableValuedFunctionTask> getTasks() throws
UserException;
public abstract TDataGenFunctionName getDataGenFunctionName();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 4c2866eb2b4..f586056dc58 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -71,6 +71,7 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTextSerdeType;
import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -396,9 +397,17 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
protected Backend getBackend() {
// For the http stream task, we should obtain the be for processing
the task
+ ImmutableMap<Long, Backend> beIdToBe;
+ try {
+ beIdToBe =
Env.getCurrentSystemInfo().getBackendsByCurrentCluster();
+ } catch (AnalysisException e) {
+ LOG.warn("get backend failed, ", e);
+ return null;
+ }
+
if (getTFileType() == TFileType.FILE_STREAM) {
long backendId = ConnectContext.get().getBackendId();
- Backend be =
Env.getCurrentSystemInfo().getIdToBackend().get(backendId);
+ Backend be = beIdToBe.get(backendId);
if (be == null || !be.isAlive()) {
LOG.warn("Backend {} is not alive", backendId);
return null;
@@ -406,7 +415,7 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
return be;
}
}
- for (Backend be :
Env.getCurrentSystemInfo().getIdToBackend().values()) {
+ for (Backend be : beIdToBe.values()) {
if (be.isAlive()) {
return be;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java
index 8d4a627043e..b40658e4796 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java
@@ -116,7 +116,7 @@ public class NumbersTableValuedFunction extends
DataGenTableValuedFunction {
@Override
public List<TableValuedFunctionTask> getTasks() throws AnalysisException {
List<Backend> backendList = Lists.newArrayList();
- for (Backend be :
Env.getCurrentSystemInfo().getIdToBackend().values()) {
+ for (Backend be :
Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values()) {
if (be.isAlive()) {
backendList.add(be);
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java
b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java
index 177522638ea..35e8b6b91e6 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java
@@ -94,7 +94,7 @@ public class AlterTest {
Config.enable_odbc_mysql_broker_table = true;
UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 5);
- List<Backend> backends =
Env.getCurrentSystemInfo().getIdToBackend().values().asList();
+ List<Backend> backends =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList();
Map<String, String> tagMap = Maps.newHashMap();
tagMap.put(Tag.TYPE_LOCATION, "group_a");
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java
index aace70b6e3e..dfc0ed04269 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java
@@ -19,7 +19,7 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Env;
import org.apache.doris.clone.RebalancerTestUtil;
-import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.MockedAuth;
import org.apache.doris.qe.ConnectContext;
@@ -52,7 +52,7 @@ public class AdminCancelRebalanceDiskStmtTest {
}
@Test
- public void testParticularBackends() throws AnalysisException {
+ public void testParticularBackends() throws UserException {
List<String> backends = Lists.newArrayList(
"192.168.0.10003:9051", "192.168.0.10004:9051",
"192.168.0.10005:9051", "192.168.0.10006:9051");
final AdminCancelRebalanceDiskStmt stmt = new
AdminCancelRebalanceDiskStmt(backends);
@@ -61,7 +61,7 @@ public class AdminCancelRebalanceDiskStmtTest {
}
@Test
- public void testEmpty() throws AnalysisException {
+ public void testEmpty() throws UserException {
List<String> backends = Lists.newArrayList();
final AdminCancelRebalanceDiskStmt stmt = new
AdminCancelRebalanceDiskStmt(backends);
stmt.analyze(analyzer);
@@ -69,7 +69,7 @@ public class AdminCancelRebalanceDiskStmtTest {
}
@Test
- public void testNull() throws AnalysisException {
+ public void testNull() throws UserException {
final AdminCancelRebalanceDiskStmt stmt = new
AdminCancelRebalanceDiskStmt(null);
stmt.analyze(analyzer);
Assert.assertEquals(4, stmt.getBackends().size());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CopyIntoTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CopyIntoTest.java
index 83328403c41..066bec09dc3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CopyIntoTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CopyIntoTest.java
@@ -247,7 +247,7 @@ public class CopyIntoTest extends TestWithFeService {
minTimes = 0;
result = stages;
- Env.getCurrentSystemInfo().getAllBackendsMap();
+ Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
minTimes = 0;
result = idToBackendRef;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableElasticOnStorageMediumTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableElasticOnStorageMediumTest.java
index 8b09b4b65b5..d9d41e34c0c 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableElasticOnStorageMediumTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableElasticOnStorageMediumTest.java
@@ -36,7 +36,7 @@ public class CreateTableElasticOnStorageMediumTest extends
TestWithFeService {
public void setStorageMediumToSSDTest() throws Exception {
SystemInfoService clusterInfo = Env.getCurrentEnv().getClusterInfo();
- List<Backend> allBackends = clusterInfo.getAllBackends();
+ List<Backend> allBackends =
clusterInfo.getAllBackendsByAllCluster().values().asList();
// set all backends' storage medium to SSD
for (Backend backend : allBackends) {
if (backend.hasPathHash()) {
@@ -59,7 +59,7 @@ public class CreateTableElasticOnStorageMediumTest extends
TestWithFeService {
public void setStorageMediumToHDDTest() throws Exception {
SystemInfoService clusterInfo = Env.getCurrentEnv().getClusterInfo();
- List<Backend> allBackends = clusterInfo.getAllBackends();
+ List<Backend> allBackends =
clusterInfo.getAllBackendsByAllCluster().values().asList();
// set all backends' storage medium to SSD
for (Backend backend : allBackends) {
if (backend.hasPathHash()) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
index 419707f7cee..79093d6ed4b 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TStorageMedium;
@@ -87,8 +88,8 @@ public class DynamicPartitionTableTest {
UtFrameUtils.cleanDorisFeDir(runningDir);
}
- private static void changeBeDisk(TStorageMedium storageMedium) {
- List<Backend> backends = Env.getCurrentSystemInfo().getAllBackends();
+ private static void changeBeDisk(TStorageMedium storageMedium) throws
UserException {
+ List<Backend> backends =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList();
for (Backend be : backends) {
for (DiskInfo diskInfo : be.getDisks().values()) {
diskInfo.setStorageMedium(storageMedium);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
index e98e6f74545..d83ba15c5bc 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
@@ -68,7 +68,7 @@ public class ModifyBackendTest {
@Test
public void testModifyBackendTag() throws Exception {
SystemInfoService infoService = Env.getCurrentSystemInfo();
- List<Backend> backends = infoService.getAllBackends();
+ List<Backend> backends =
infoService.getAllBackendsByAllCluster().values().asList();
Assert.assertEquals(1, backends.size());
String beHostPort = backends.get(0).getHost() + ":" +
backends.get(0).getHeartbeatPort();
@@ -76,7 +76,7 @@ public class ModifyBackendTest {
String stmtStr = "alter system modify backend \"" + beHostPort + "\"
set ('tag.location' = 'zone1')";
AlterSystemStmt stmt = (AlterSystemStmt)
UtFrameUtils.parseAndAnalyzeStmt(stmtStr, connectContext);
DdlExecutor.execute(Env.getCurrentEnv(), stmt);
- backends = infoService.getAllBackends();
+ backends = infoService.getAllBackendsByAllCluster().values().asList();
Assert.assertEquals(1, backends.size());
// create table
@@ -175,13 +175,13 @@ public class ModifyBackendTest {
@Test
public void testModifyBackendAvailableProperty() throws Exception {
SystemInfoService infoService = Env.getCurrentSystemInfo();
- List<Backend> backends = infoService.getAllBackends();
+ List<Backend> backends =
infoService.getAllBackendsByAllCluster().values().asList();
String beHostPort = backends.get(0).getHost() + ":" +
backends.get(0).getHeartbeatPort();
// modify backend available property
String stmtStr = "alter system modify backend \"" + beHostPort + "\"
set ('disable_query' = 'true', 'disable_load' = 'true')";
AlterSystemStmt stmt = (AlterSystemStmt)
UtFrameUtils.parseAndAnalyzeStmt(stmtStr, connectContext);
DdlExecutor.execute(Env.getCurrentEnv(), stmt);
- Backend backend = infoService.getAllBackends().get(0);
+ Backend backend =
infoService.getAllBackendsByAllCluster().values().asList().get(0);
Assert.assertFalse(backend.isQueryAvailable());
Assert.assertFalse(backend.isLoadAvailable());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java
index dee048223b5..6441b9e9bba 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java
@@ -50,7 +50,7 @@ public class AddReplicaChoseMediumTest extends
TestWithFeService {
@Test
public void testAddReplicaChoseMedium() throws Exception {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
- List<Backend> backends = Env.getCurrentSystemInfo().getAllBackends();
+ List<Backend> backends =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList();
Assertions.assertEquals(backendNum(), backends.size());
for (Backend be : backends) {
Assertions.assertEquals(0,
invertedIndex.getTabletNumByBackendId(be.getId()));
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/BalanceStatistic.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/BalanceStatistic.java
index 174001a1bcc..d903c68fcff 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/BalanceStatistic.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/BalanceStatistic.java
@@ -19,6 +19,7 @@ package org.apache.doris.clone;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Replica;
+import org.apache.doris.common.UserException;
import org.apache.doris.system.Backend;
import com.google.common.collect.Maps;
@@ -37,10 +38,10 @@ public class BalanceStatistic {
this.backendTotalReplicaNum = backendTotalReplicaNum;
}
- public static BalanceStatistic getCurrentBalanceStatistic() {
+ public static BalanceStatistic getCurrentBalanceStatistic() throws
UserException {
Map<Long, Long> backendTotalDataSize = Maps.newHashMap();
Map<Long, Integer> backendTotalReplicaNum = Maps.newHashMap();
- List<Backend> backends =
Env.getCurrentSystemInfo().getIdToBackend().values().asList();
+ List<Backend> backends =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList();
backends.forEach(be -> {
backendTotalDataSize.put(be.getId(), 0L);
backendTotalReplicaNum.put(be.getId(), 0);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerPerfTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerPerfTest.java
index c560c62003a..0ff5adadb4c 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerPerfTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerPerfTest.java
@@ -65,7 +65,7 @@ public class ColocateTableCheckerAndBalancerPerfTest {
Config.disable_tablet_scheduler = true;
UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 6);
- backends =
Env.getCurrentSystemInfo().getIdToBackend().values().asList();
+ backends =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList();
for (Backend be : backends) {
for (DiskInfo diskInfo : be.getDisks().values()) {
diskInfo.setTotalCapacityB(10L << 40);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java
index d97e3a09549..81e29d3abf3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java
@@ -80,7 +80,7 @@ public class DecommissionTest {
// 127.0.0.3
// 127.0.0.4
UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 4);
- List<Backend> backends = Env.getCurrentSystemInfo().getAllBackends();
+ List<Backend> backends =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList();
for (Backend be : backends) {
Map<String, TDisk> backendDisks = Maps.newHashMap();
TDisk tDisk1 = new TDisk();
@@ -139,7 +139,7 @@ public class DecommissionTest {
int totalReplicaNum = 1 * 2400;
checkBalance(1, totalReplicaNum, 4);
- Backend backend = Env.getCurrentSystemInfo().getAllBackends().get(0);
+ Backend backend =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList().get(0);
String decommissionStmtStr = "alter system decommission backend \"" +
backend.getHost()
+ ":" + backend.getHeartbeatPort() + "\"";
AlterSystemStmt decommissionStmt =
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java
index 40b6683da3d..860a36f8f63 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java
@@ -65,7 +65,7 @@ public class DiskReblanceWhenSchedulerIdle extends
TestWithFeService {
public void testDiskReblanceWhenSchedulerIdle() throws Exception {
// case start
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
- List<Backend> backends = Env.getCurrentSystemInfo().getAllBackends();
+ List<Backend> backends =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList();
Assertions.assertEquals(backendNum(), backends.size());
for (Backend be : backends) {
Assertions.assertEquals(0,
invertedIndex.getTabletNumByBackendId(be.getId()));
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
index 356d8ed6422..c02afa1db08 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
@@ -131,7 +131,7 @@ public class TabletRepairAndBalanceTest {
Env.getCurrentEnv().createDb(createDbStmt);
// must set disk info, or the tablet scheduler won't work
- backends = Env.getCurrentSystemInfo().getAllBackends();
+ backends =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList();
for (Backend be : backends) {
Map<String, TDisk> backendDisks = Maps.newHashMap();
TDisk tDisk1 = new TDisk();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java
index 7d918ef7db5..97f3f0d878a 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java
@@ -92,7 +92,7 @@ public class TabletReplicaTooSlowTest {
Env.getCurrentEnv().createDb(createDbStmt);
// must set disk info, or the tablet scheduler won't work
- backends = Env.getCurrentSystemInfo().getAllBackends();
+ backends =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList();
for (Backend be : backends) {
Map<String, TDisk> backendDisks = Maps.newHashMap();
TDisk tDisk1 = new TDisk();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
index 775adba0130..fc80ecbd97d 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
@@ -74,7 +74,7 @@ public class DecommissionBackendTest extends
TestWithFeService {
// 1. create connect context
connectContext = createDefaultCtx();
- ImmutableMap<Long, Backend> idToBackendRef =
Env.getCurrentSystemInfo().getIdToBackend();
+ ImmutableMap<Long, Backend> idToBackendRef =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
Assertions.assertEquals(backendNum(), idToBackendRef.size());
// 2. create database db1
@@ -105,11 +105,11 @@ public class DecommissionBackendTest extends
TestWithFeService {
Assertions.assertTrue(srcBackend.isDecommissioned());
long startTimestamp = System.currentTimeMillis();
while (System.currentTimeMillis() - startTimestamp < 90000
- &&
Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) {
+ &&
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().containsKey(srcBackend.getId()))
{
Thread.sleep(1000);
}
- Assertions.assertEquals(backendNum() - 1,
Env.getCurrentSystemInfo().getIdToBackend().size());
+ Assertions.assertEquals(backendNum() - 1,
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size());
// For now, we have pre-built internal table: analysis_job and
column_statistics
Assertions.assertEquals(tabletNum,
@@ -117,14 +117,14 @@ public class DecommissionBackendTest extends
TestWithFeService {
// 6. add backend
addNewBackend();
- Assertions.assertEquals(backendNum(),
Env.getCurrentSystemInfo().getIdToBackend().size());
+ Assertions.assertEquals(backendNum(),
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size());
}
@Test
public void testDecommissionBackendById() throws Exception {
// 1. create connect context
connectContext = createDefaultCtx();
- ImmutableMap<Long, Backend> idToBackendRef =
Env.getCurrentSystemInfo().getIdToBackend();
+ ImmutableMap<Long, Backend> idToBackendRef =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
Assertions.assertEquals(backendNum(), idToBackendRef.size());
// 2. create database db1
@@ -158,15 +158,15 @@ public class DecommissionBackendTest extends
TestWithFeService {
Assertions.assertTrue(srcBackend.isDecommissioned());
long startTimestamp = System.currentTimeMillis();
while (System.currentTimeMillis() - startTimestamp < 90000
- &&
Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) {
+ &&
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().containsKey(srcBackend.getId()))
{
Thread.sleep(1000);
}
- Assertions.assertEquals(backendNum() - 1,
Env.getCurrentSystemInfo().getIdToBackend().size());
+ Assertions.assertEquals(backendNum() - 1,
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size());
// add backend
addNewBackend();
- Assertions.assertEquals(backendNum(),
Env.getCurrentSystemInfo().getIdToBackend().size());
+ Assertions.assertEquals(backendNum(),
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size());
}
@@ -177,7 +177,7 @@ public class DecommissionBackendTest extends
TestWithFeService {
SystemInfoService infoService = Env.getCurrentSystemInfo();
- ImmutableMap<Long, Backend> idToBackendRef =
infoService.getIdToBackend();
+ ImmutableMap<Long, Backend> idToBackendRef =
infoService.getAllBackendsByAllCluster();
Assertions.assertEquals(backendNum(), idToBackendRef.size());
// 2. create database db3
@@ -220,12 +220,12 @@ public class DecommissionBackendTest extends
TestWithFeService {
long startTimestamp = System.currentTimeMillis();
while (System.currentTimeMillis() - startTimestamp < 90000
- &&
Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) {
+ &&
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().containsKey(srcBackend.getId()))
{
Thread.sleep(1000);
}
// BE has been dropped successfully
- Assertions.assertEquals(backendNum() - 1,
Env.getCurrentSystemInfo().getIdToBackend().size());
+ Assertions.assertEquals(backendNum() - 1,
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size());
// tbl1 has been dropped successfully
final String sql = "show create table db3.tbl1;";
@@ -242,7 +242,7 @@ public class DecommissionBackendTest extends
TestWithFeService {
dropTable("db3.tbl1", false);
addNewBackend();
- Assertions.assertEquals(backendNum(),
Env.getCurrentSystemInfo().getIdToBackend().size());
+ Assertions.assertEquals(backendNum(),
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size());
}
@Test
@@ -250,7 +250,7 @@ public class DecommissionBackendTest extends
TestWithFeService {
// 1. create connect context
connectContext = createDefaultCtx();
- ImmutableMap<Long, Backend> idToBackendRef =
Env.getCurrentSystemInfo().getIdToBackend();
+ ImmutableMap<Long, Backend> idToBackendRef =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
Assertions.assertEquals(backendNum(), idToBackendRef.size());
// 2. create database db1
@@ -320,11 +320,11 @@ public class DecommissionBackendTest extends
TestWithFeService {
Assertions.assertTrue(srcBackend.isDecommissioned());
long startTimestamp = System.currentTimeMillis();
while (System.currentTimeMillis() - startTimestamp < 90000
- &&
Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) {
+ &&
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().containsKey(srcBackend.getId()))
{
Thread.sleep(1000);
}
- Assertions.assertEquals(backendNum() - 1,
Env.getCurrentSystemInfo().getIdToBackend().size());
+ Assertions.assertEquals(backendNum() - 1,
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size());
// For now, we have pre-built internal table: analysis_job and
column_statistics
Assertions.assertEquals(tabletNum,
@@ -336,6 +336,6 @@ public class DecommissionBackendTest extends
TestWithFeService {
// 6. add backend
addNewBackend();
- Assertions.assertEquals(backendNum(),
Env.getCurrentSystemInfo().getIdToBackend().size());
+ Assertions.assertEquals(backendNum(),
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size());
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java
index 40c207631e7..c48ba030e77 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java
@@ -311,7 +311,7 @@ public class SystemInfoServiceTest {
DataInputStream dis = new DataInputStream(new BufferedInputStream(new
FileInputStream(file)));
long checksum2 = systemInfoService.loadBackends(dis, 0);
Assert.assertEquals(checksum1, checksum2);
- Assert.assertEquals(1, systemInfoService.getIdToBackend().size());
+ Assert.assertEquals(1,
systemInfoService.getAllBackendsByAllCluster().size());
Backend back2 = systemInfoService.getBackend(1);
Assert.assertEquals(back1, back2);
dis.close();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java
index 3a239287342..f03a4282d9a 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java
@@ -18,6 +18,7 @@
package org.apache.doris.common.util;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.persist.EditLog;
@@ -63,7 +64,7 @@ public class AutoBucketUtilsTest {
private static void createClusterWithBackends(int beNum, int diskNum, long
diskCapacity) throws Exception {
UtFrameUtils.createDorisClusterWithMultiTag(runningDir, beNum);
// must set disk info, or the tablet scheduler won't work
- backends = Env.getCurrentSystemInfo().getAllBackends();
+ backends =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList();
for (Backend be : backends) {
setDiskInfos(diskNum, diskCapacity, be);
}
@@ -101,7 +102,7 @@ public class AutoBucketUtilsTest {
}
private void expectations(Env env, EditLog editLog, SystemInfoService
systemInfoService,
- ImmutableMap<Long, Backend> backends) {
+ ImmutableMap<Long, Backend> backends) throws AnalysisException {
new Expectations() {
{
Env.getServingEnv();
@@ -112,7 +113,7 @@ public class AutoBucketUtilsTest {
minTimes = 0;
result = systemInfoService;
- systemInfoService.getAllBackendsMap();
+ systemInfoService.getAllBackendsByAllCluster();
minTimes = 0;
result = backends;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
index 1f4b49b2026..00467541372 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
@@ -156,7 +156,7 @@ public class CanalSyncDataTest {
minTimes = 0;
result = backendIds;
- systemInfoService.getIdToBackend();
+ systemInfoService.getAllBackendsByAllCluster();
minTimes = 0;
result = backendMap;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
index 82f46862674..6933511d4e3 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
@@ -357,9 +357,10 @@ public class FederationBackendPolicyTest {
int localHostNum = random.nextInt(3 - 1) + 1;
Set<String> localHosts = new HashSet<>();
String localHost;
+ List<Backend> backends =
service.getAllBackendsByAllCluster().values().asList();
for (int j = 0; j < localHostNum; ++j) {
do {
- localHost =
service.getAllBackends().get(random.nextInt(service.getAllBackends().size())).getHost();
+ localHost =
backends.get(random.nextInt(backends.size())).getHost();
} while (!localHosts.add(localHost));
totalLocalHosts.add(localHost);
}
@@ -480,9 +481,10 @@ public class FederationBackendPolicyTest {
int localHostNum = random.nextInt(3 - 1) + 1;
Set<String> localHosts = new HashSet<>();
String localHost;
+ List<Backend> backends =
service.getAllBackendsByAllCluster().values().asList();
for (int j = 0; j < localHostNum; ++j) {
do {
- localHost =
service.getAllBackends().get(random.nextInt(service.getAllBackends().size())).getHost();
+ localHost =
backends.get(random.nextInt(backends.size())).getHost();
} while (!localHosts.add(localHost));
totalLocalHosts.add(localHost);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java
index 55ee219db61..207bddae1b3 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java
@@ -109,7 +109,7 @@ public class ResourceTagQueryTest {
Env.getCurrentEnv().createDb(createDbStmt);
// must set disk info, or the tablet scheduler won't work
- backends = Env.getCurrentSystemInfo().getAllBackends();
+ backends =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList();
for (Backend be : backends) {
Map<String, TDisk> backendDisks = Maps.newHashMap();
TDisk tDisk1 = new TDisk();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
index 7481e9ffd19..d3238328814 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
@@ -29,8 +29,8 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.Config;
-import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.proc.BackendsProcDir;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.planner.OlapScanNode;
@@ -80,7 +80,7 @@ public class DemoMultiBackendsTest {
@BeforeClass
public static void beforeClass() throws EnvVarNotSetException, IOException,
- FeStartException, NotInitException, DdlException,
InterruptedException {
+ FeStartException, NotInitException, UserException,
InterruptedException {
FeConstants.runningUnitTest = true;
FeConstants.default_scheduler_interval_millisecond = 100;
Config.tablet_checker_interval_ms = 1000;
@@ -89,7 +89,7 @@ public class DemoMultiBackendsTest {
UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 3);
// must set disk info, or the tablet scheduler won't work
- backends = Env.getCurrentSystemInfo().getAllBackends();
+ backends =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList();
for (Backend be : backends) {
Map<String, TDisk> backendDisks = Maps.newHashMap();
TDisk tDisk1 = new TDisk();
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 2cd27b0968d..ddec61e6d43 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1636,7 +1636,6 @@ class Suite implements GroovyInterceptable {
} else {
endpoint context.config.metaServiceHttpAddress
}
- endpoint context.config.metaServiceHttpAddress
uri "/MetaService/http/drop_cluster?token=${token}"
body request_body
check check_func
diff --git a/regression-test/suites/cloud_p0/multi_cluster/test_tvf.groovy
b/regression-test/suites/cloud_p0/multi_cluster/test_tvf.groovy
new file mode 100644
index 00000000000..13af1209e99
--- /dev/null
+++ b/regression-test/suites/cloud_p0/multi_cluster/test_tvf.groovy
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+
+suite('test_tvf_in_cloud', 'multi_cluster') {
+ if (!isCloudMode()) {
+ return;
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ ]
+ options.cloudMode = true
+
+ def testCase = {
+ for (def i = 0; i < 100; i++) {
+ def ret = sql """select * from numbers("number" = "100")"""
+ assertEquals(ret.size(), 100)
+ test {
+ // current cloud not implement it
+ sql """select START_VERSION,END_VERSION from
information_schema.rowsets"""
+ exception "_get_all_rowsets is not implemented"
+ }
+ }
+ }
+
+ docker(options) {
+ def clusterName = "newcluster1"
+ // 添加一个新的cluster add_new_cluster
+ cluster.addBackend(3, clusterName)
+
+ def result = sql """show clusters"""
+ logger.info("show cluster1 : {}", result)
+ def tag = getCloudBeTagByName(clusterName)
+ logger.info("tag = {}", tag)
+
+ def jsonSlurper = new JsonSlurper()
+ def jsonObject = jsonSlurper.parseText(tag)
+ def cloudClusterId = jsonObject.cloud_cluster_id
+ // multi cluster env
+
+ // current cluster
+ testCase.call()
+ // use other cluster
+ def ret = sql_return_maparray """show clusters"""
+ def currentCluster = ret.stream().filter(cluster -> cluster.is_current
== "TRUE").findFirst().orElse(null)
+ def otherCluster = ret.stream().filter(cluster -> cluster.is_current
== "FALSE").findFirst().orElse(null)
+ assertTrue(otherCluster != null)
+ sql """use @${otherCluster.cluster}"""
+ testCase.call()
+
+ // 调用http api 将add_new_cluster 下掉
+ def ms = cluster.getAllMetaservices().get(0)
+ logger.info("ms addr={}, port={}", ms.host, ms.httpPort)
+ drop_cluster(clusterName, cloudClusterId, ms)
+ Thread.sleep(5000)
+ result = sql """show clusters"""
+ logger.info("show cluster2 : {}", result)
+
+ // single cluster env
+ // use old clusterName, has been droped
+ test {
+ sql """select * from numbers("number" = "100")"""
+ exception "in cloud maybe this cluster has been dropped"
+ }
+ // switch to old cluster
+ sql """use @${currentCluster.cluster}"""
+ testCase.call()
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]