This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 6f8b94957ed [improvement](query) prefer to chose tablet on alive disk
#39467 (#39657)
6f8b94957ed is described below
commit 6f8b94957ed4d4d637144fdc3fbc758c866042c2
Author: yujun <[email protected]>
AuthorDate: Fri Aug 23 12:23:34 2024 +0800
[improvement](query) prefer to chose tablet on alive disk #39467 (#39657)
cherry pick from #39467
---
be/src/agent/task_worker_pool.cpp | 17 +++--
be/src/service/doris_main.cpp | 2 +
.../java/org/apache/doris/catalog/DiskInfo.java | 4 ++
.../main/java/org/apache/doris/catalog/Tablet.java | 28 +++++---
.../org/apache/doris/master/ReportHandler.java | 29 +++++++-
.../org/apache/doris/planner/OlapScanNode.java | 14 +++-
.../main/java/org/apache/doris/system/Backend.java | 5 ++
.../org/apache/doris/catalog/QueryTabletTest.java | 84 ++++++++++++++++++++++
.../java/org/apache/doris/catalog/TabletTest.java | 13 ++++
.../doris/cluster/DecommissionBackendTest.java | 9 ++-
.../apache/doris/utframe/MockedBackendFactory.java | 31 ++++++++
.../apache/doris/utframe/TestWithFeService.java | 56 ++++++++++-----
12 files changed, 249 insertions(+), 43 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 0b525354da9..35fb7702b98 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -693,10 +693,6 @@ void
TaskWorkerPool::_report_disk_state_worker_thread_callback() {
}
_is_doing_work = true;
- // Random sleep 1~5 seconds before doing report.
- // In order to avoid the problem that the FE receives many report
requests at the same time
- // and can not be processed.
- _random_sleep(5);
TReportRequest request;
request.__set_backend(BackendOptions::get_local_backend());
@@ -759,9 +755,16 @@ void
TaskWorkerPool::_report_tablet_worker_thread_callback() {
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.tablets = true;
- uint64_t report_version = _s_report_version;
-
StorageEngine::instance()->tablet_manager()->build_all_report_tablets_info(
- &request.tablets);
+ uint64_t report_version;
+ for (int i = 0; i < 5; i++) {
+ request.tablets.clear();
+ report_version = _s_report_version;
+
StorageEngine::instance()->tablet_manager()->build_all_report_tablets_info(
+ &request.tablets);
+ if (report_version == _s_report_version) {
+ break;
+ }
+ }
if (report_version < _s_report_version) {
// TODO llj This can only reduce the possibility for report error,
but can't avoid it.
// If FE create a tablet in FE meta and send CREATE task to this
BE, the tablet may not be included in this
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 19259b5e3da..3b6cf2969cf 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -545,6 +545,8 @@ int main(int argc, char** argv) {
status.to_string());
}
+ exec_env->storage_engine()->notify_listeners();
+
while (!doris::k_doris_exit) {
#if defined(LEAK_SANITIZER)
__lsan_do_leak_check();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
index b49acb2ff83..666f6147e83 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
@@ -151,6 +151,10 @@ public class DiskInfo implements Writable {
return pathHash != 0;
}
+ public boolean isAlive() {
+ return state == DiskState.ONLINE;
+ }
+
public boolean isStorageMediumMatch(TStorageMedium storageMedium) {
return this.storageMedium == storageMedium;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index 8baac8bd71d..a7240895029 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -240,9 +240,11 @@ public class Tablet extends MetaObject implements Writable
{
}
// for query
- public List<Replica> getQueryableReplicas(long visibleVersion, boolean
allowFailedVersion) {
+ public List<Replica> getQueryableReplicas(long visibleVersion, Map<Long,
Set<Long>> backendAlivePathHashs,
+ boolean allowFailedVersion) {
List<Replica> allQueryableReplica =
Lists.newArrayListWithCapacity(replicas.size());
List<Replica> auxiliaryReplica =
Lists.newArrayListWithCapacity(replicas.size());
+ List<Replica> deadPathReplica = Lists.newArrayList();
for (Replica replica : replicas) {
if (replica.isBad()) {
continue;
@@ -253,21 +255,31 @@ public class Tablet extends MetaObject implements
Writable {
continue;
}
+ if (!replica.checkVersionCatchUp(visibleVersion, false)) {
+ continue;
+ }
+
+ Set<Long> thisBeAlivePaths =
backendAlivePathHashs.get(replica.getBackendId());
ReplicaState state = replica.getState();
- if (state.canQuery()) {
- if (replica.checkVersionCatchUp(visibleVersion, false)) {
- allQueryableReplica.add(replica);
- }
+ // if thisBeAlivePaths contains pathHash = 0, it mean this be
hadn't report disks state.
+ // should ignore this case.
+ if (replica.getPathHash() != -1 && thisBeAlivePaths != null
+ && !thisBeAlivePaths.contains(replica.getPathHash())
+ && !thisBeAlivePaths.contains(0L)) {
+ deadPathReplica.add(replica);
+ } else if (state.canQuery()) {
+ allQueryableReplica.add(replica);
} else if (state == ReplicaState.DECOMMISSION) {
- if (replica.checkVersionCatchUp(visibleVersion, false)) {
- auxiliaryReplica.add(replica);
- }
+ auxiliaryReplica.add(replica);
}
}
if (allQueryableReplica.isEmpty()) {
allQueryableReplica = auxiliaryReplica;
}
+ if (allQueryableReplica.isEmpty()) {
+ allQueryableReplica = deadPathReplica;
+ }
if (Config.skip_compaction_slower_replica &&
allQueryableReplica.size() > 1) {
long minVersionCount = Long.MAX_VALUE;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 13215503f66..125f2553ec9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -21,6 +21,7 @@ package org.apache.doris.master;
import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
@@ -723,6 +724,15 @@ public class ReportHandler extends Daemon {
AgentBatchTask createReplicaBatchTask = new AgentBatchTask();
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
Map<Object, Object> objectPool = new HashMap<Object, Object>();
+ Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
+ Set<Long> backendHealthPathHashs;
+ if (backend == null) {
+ backendHealthPathHashs = Sets.newHashSet();
+ } else {
+ backendHealthPathHashs = backend.getDisks().values().stream()
+ .filter(DiskInfo::isAlive)
+ .map(DiskInfo::getPathHash).collect(Collectors.toSet());
+ }
for (Long dbId : tabletDeleteFromMeta.keySet()) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
@@ -782,7 +792,24 @@ public class ReportHandler extends Daemon {
long currentBackendReportVersion =
Env.getCurrentSystemInfo()
.getBackendReportVersion(backendId);
if (backendReportVersion < currentBackendReportVersion) {
- continue;
+
+ // if backendHealthPathHashs contains health path hash
0,
+ // it means this backend hadn't reported disks state,
+ // should ignore this case.
+ boolean thisReplicaOnBadDisk = replica.getPathHash()
!= -1L
+ &&
!backendHealthPathHashs.contains(replica.getPathHash())
+ && !backendHealthPathHashs.contains(0L);
+
+ boolean existsOtherHealthReplica =
tablet.getReplicas().stream()
+ .anyMatch(r -> r.getBackendId() !=
replica.getBackendId()
+ && r.getVersion() >=
replica.getVersion()
+ && r.getLastFailedVersion() == -1L
+ && !r.isBad());
+
+ // if replica is on bad disks and there are other
health replicas, still delete it.
+ if (!(thisReplicaOnBadDisk &&
existsOtherHealthReplica)) {
+ continue;
+ }
}
if (state == ReplicaState.NORMAL || state ==
ReplicaState.SCHEMA_CHANGE) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 0509eabeb65..6342e8bdb50 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -41,6 +41,7 @@ import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
@@ -721,7 +722,7 @@ public class OlapScanNode extends ScanNode {
}
private void addScanRangeLocations(Partition partition,
- List<Tablet> tablets) throws UserException {
+ List<Tablet> tablets, Map<Long, Set<Long>> backendAlivePathHashs)
throws UserException {
long visibleVersion = partition.getVisibleVersion();
String visibleVersionStr = String.valueOf(visibleVersion);
@@ -765,7 +766,8 @@ public class OlapScanNode extends ScanNode {
paloRange.setTabletId(tabletId);
// random shuffle List && only collect one copy
- List<Replica> replicas =
tablet.getQueryableReplicas(visibleVersion, skipMissingVersion);
+ List<Replica> replicas =
tablet.getQueryableReplicas(visibleVersion,
+ backendAlivePathHashs, skipMissingVersion);
if (replicas.isEmpty()) {
LOG.warn("no queryable replica found in tablet {}. visible
version {}", tabletId, visibleVersion);
StringBuilder sb = new StringBuilder(
@@ -1077,6 +1079,12 @@ public class OlapScanNode extends ScanNode {
*/
Preconditions.checkState(scanBackendIds.size() == 0);
Preconditions.checkState(scanTabletIds.size() == 0);
+ Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap();
+ for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) {
+ backendAlivePathHashs.put(backend.getId(),
backend.getDisks().values().stream()
+
.filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet()));
+ }
+
for (Long partitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(partitionId);
final MaterializedIndex selectedTable =
partition.getIndex(selectedIndexId);
@@ -1114,7 +1122,7 @@ public class OlapScanNode extends ScanNode {
totalTabletsNum += selectedTable.getTablets().size();
selectedTabletsNum += tablets.size();
- addScanRangeLocations(partition, tablets);
+ addScanRangeLocations(partition, tablets, backendAlivePathHashs);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index 99cd99dca06..c41a70d60ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -172,6 +172,11 @@ public class Backend implements Writable {
return id;
}
+ // Return ip:heartbeat port
+ public String getAddress() {
+ return host + ":" + heartbeatPort;
+ }
+
public String getHost() {
return host;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java
new file mode 100644
index 00000000000..b80544245da
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.system.Backend;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class QueryTabletTest extends TestWithFeService {
+
+ @Override
+ protected int backendNum() {
+ return 3;
+ }
+
+ @Test
+ public void testTabletOnBadDisks() throws Exception {
+ createDatabase("db1");
+ createTable("create table db1.tbl1(k1 int) distributed by hash(k1)
buckets 1"
+ + " properties('replication_num' = '3')");
+
+ Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:db1");
+ OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1");
+ Assertions.assertNotNull(tbl);
+ Tablet tablet = tbl.getPartitions().iterator().next()
+
.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next()
+ .getTablets().iterator().next();
+
+ List<Replica> replicas = tablet.getReplicas();
+ Assertions.assertEquals(3, replicas.size());
+ for (Replica replica : replicas) {
+ Assertions.assertTrue(replica.getPathHash() != -1L);
+ }
+
+ Assertions.assertEquals(replicas,
+ tablet.getQueryableReplicas(1L, getAlivePathHashs(), false));
+
+ // disk mark as bad
+ Env.getCurrentSystemInfo().getBackend(replicas.get(0).getBackendId())
+ .getDisks().values().forEach(disk ->
disk.setState(DiskInfo.DiskState.OFFLINE));
+
+ // lost disk
+ replicas.get(1).setPathHash(-123321L);
+
+ Assertions.assertEquals(Lists.newArrayList(replicas.get(2)),
+ tablet.getQueryableReplicas(1L, getAlivePathHashs(), false));
+ }
+
+ private Map<Long, Set<Long>> getAlivePathHashs() {
+ Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap();
+ for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) {
+ backendAlivePathHashs.put(backend.getId(),
backend.getDisks().values().stream()
+
.filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet()));
+ }
+
+ return backendAlivePathHashs;
+ }
+
+}
+
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
index d7fdb2694a8..99769a6b525 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
@@ -20,6 +20,8 @@ package org.apache.doris.catalog;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;
import com.google.common.collect.Sets;
@@ -42,6 +44,7 @@ public class TabletTest {
private Replica replica3;
private TabletInvertedIndex invertedIndex;
+ private SystemInfoService infoService;
@Mocked
private Env env;
@@ -49,6 +52,12 @@ public class TabletTest {
@Before
public void makeTablet() {
invertedIndex = new TabletInvertedIndex();
+ infoService = new SystemInfoService();
+ for (long beId = 1L; beId <= 4L; beId++) {
+ Backend be = new Backend(beId, "127.0.0." + beId, 8030);
+ be.setAlive(true);
+ infoService.addBackend(be);
+ }
new Expectations(env) {
{
Env.getCurrentEnvJournalVersion();
@@ -59,6 +68,10 @@ public class TabletTest {
minTimes = 0;
result = invertedIndex;
+ Env.getCurrentSystemInfo();
+ minTimes = 0;
+ result = infoService;
+
Env.isCheckpointThread();
minTimes = 0;
result = false;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
index ff5f5292be8..e67cbccb668 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
@@ -97,8 +97,8 @@ public class DecommissionBackendTest extends
TestWithFeService {
}
}
- Assertions.assertTrue(srcBackend != null);
- String decommissionStmtStr = "alter system decommission backend
\"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\"";
+ Assertions.assertNotNull(srcBackend);
+ String decommissionStmtStr = "alter system decommission backend \"" +
srcBackend.getAddress() + "\"";
AlterSystemStmt decommissionStmt = (AlterSystemStmt)
parseAndAnalyzeStmt(decommissionStmtStr);
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
@@ -163,7 +163,7 @@ public class DecommissionBackendTest extends
TestWithFeService {
dropTable("db2.tbl1", false);
// 6. execute decommission
- String decommissionStmtStr = "alter system decommission backend
\"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\"";
+ String decommissionStmtStr = "alter system decommission backend \"" +
srcBackend.getAddress() + "\"";
AlterSystemStmt decommissionStmt = (AlterSystemStmt)
parseAndAnalyzeStmt(decommissionStmtStr);
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
Assertions.assertEquals(true, srcBackend.isDecommissioned());
@@ -240,8 +240,7 @@ public class DecommissionBackendTest extends
TestWithFeService {
// 4. query tablet num
int tabletNum =
Env.getCurrentInvertedIndex().getTabletMetaMap().size();
- String decommissionStmtStr = "alter system decommission backend
\"127.0.0.1:"
- + srcBackend.getHeartbeatPort() + "\"";
+ String decommissionStmtStr = "alter system decommission backend \"" +
srcBackend.getAddress() + "\"";
AlterSystemStmt decommissionStmt = (AlterSystemStmt)
parseAndAnalyzeStmt(decommissionStmtStr);
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 665dc8163ae..8feff517c79 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -39,6 +39,7 @@ import org.apache.doris.thrift.TCancelPlanFragmentParams;
import org.apache.doris.thrift.TCancelPlanFragmentResult;
import org.apache.doris.thrift.TCheckStorageFormatResult;
import org.apache.doris.thrift.TCloneReq;
+import org.apache.doris.thrift.TCreateTabletReq;
import org.apache.doris.thrift.TDiskTrashInfo;
import org.apache.doris.thrift.TDropTabletReq;
import org.apache.doris.thrift.TExecPlanFragmentParams;
@@ -81,7 +82,9 @@ import org.apache.thrift.TException;
import java.io.IOException;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
/*
* This class is used to create mock backends.
@@ -184,6 +187,9 @@ public class MockedBackendFactory {
TTaskType taskType = request.getTaskType();
switch (taskType) {
case CREATE:
+ ++reportVersion;
+ handleCreateTablet(request,
finishTaskRequest);
+ break;
case ALTER:
++reportVersion;
break;
@@ -191,6 +197,7 @@ public class MockedBackendFactory {
handleDropTablet(request,
finishTaskRequest);
break;
case CLONE:
+ ++reportVersion;
handleCloneTablet(request,
finishTaskRequest);
break;
case STORAGE_MEDIUM_MIGRATE:
@@ -216,6 +223,30 @@ public class MockedBackendFactory {
}
}
+ private void handleCreateTablet(TAgentTaskRequest request,
TFinishTaskRequest finishTaskRequest) {
+ TCreateTabletReq req = request.getCreateTabletReq();
+ List<DiskInfo> candDisks =
backendInFe.getDisks().values().stream()
+ .filter(disk -> req.storage_medium ==
disk.getStorageMedium() && disk.isAlive())
+ .collect(Collectors.toList());
+ if (candDisks.isEmpty()) {
+ candDisks = backendInFe.getDisks().values().stream()
+ .filter(DiskInfo::isAlive)
+ .collect(Collectors.toList());
+ }
+ DiskInfo choseDisk = candDisks.isEmpty() ? null
+ : candDisks.get(new
Random().nextInt(candDisks.size()));
+
+ List<TTabletInfo> tabletInfos = Lists.newArrayList();
+ TTabletInfo tabletInfo = new TTabletInfo();
+ tabletInfo.setTabletId(req.tablet_id);
+ tabletInfo.setVersion(req.version);
+ tabletInfo.setPathHash(choseDisk == null ? -1L :
choseDisk.getPathHash());
+ tabletInfo.setReplicaId(req.replica_id);
+ tabletInfo.setUsed(true);
+ tabletInfos.add(tabletInfo);
+ finishTaskRequest.setFinishTabletInfos(tabletInfos);
+ }
+
private void handleDropTablet(TAgentTaskRequest request,
TFinishTaskRequest finishTaskRequest) {
TDropTabletReq req = request.getDropTabletReq();
long dataSize = Math.max(1,
CatalogTestUtil.getTabletDataSize(req.tablet_id));
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 59e4eae0a37..db77fcd07ec 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -386,7 +386,7 @@ public abstract class TestWithFeService {
protected void createDorisCluster()
throws InterruptedException, NotInitException, IOException,
DdlException, EnvVarNotSetException,
FeStartException {
- createDorisCluster(runningDir, backendNum());
+ createDorisClusterWithMultiTag(runningDir, backendNum());
}
protected void createDorisCluster(String runningDir, int backendNum)
@@ -399,26 +399,13 @@ public abstract class TestWithFeService {
bes.add(createBackend("127.0.0.1", feRpcPort));
}
System.out.println("after create backend");
- checkBEHeartbeat(bes);
+ if (!checkBEHeartbeat(bes)) {
+ System.out.println("Some backends dead, all backends: " + bes);
+ }
// Thread.sleep(2000);
System.out.println("after create backend2");
}
- private void checkBEHeartbeat(List<Backend> bes) throws
InterruptedException {
- int maxTry = Config.heartbeat_interval_second + 5;
- boolean allAlive = false;
- while (maxTry-- > 0 && !allAlive) {
- Thread.sleep(1000);
- boolean hasDead = false;
- for (Backend be : bes) {
- if (!be.isAlive()) {
- hasDead = true;
- }
- }
- allAlive = !hasDead;
- }
- }
-
// Create multi backends with different host for unit test.
// the host of BE will be "127.0.0.1", "127.0.0.2"
protected void createDorisClusterWithMultiTag(String runningDir, int
backendNum)
@@ -426,14 +413,45 @@ public abstract class TestWithFeService {
InterruptedException {
// set runningUnitTest to true, so that for ut, the agent task will be
send to "127.0.0.1"
// to make cluster running well.
- FeConstants.runningUnitTest = true;
+ if (backendNum > 1) {
+ FeConstants.runningUnitTest = true;
+ }
int feRpcPort = startFEServer(runningDir);
List<Backend> bes = Lists.newArrayList();
+ System.out.println("start create backend, backend num " + backendNum);
for (int i = 0; i < backendNum; i++) {
String host = "127.0.0." + (i + 1);
bes.add(createBackend(host, feRpcPort));
}
- checkBEHeartbeat(bes);
+ System.out.println("after create backend");
+ if (!checkBEHeartbeat(bes)) {
+ System.out.println("Some backends dead, all backends: " + bes);
+ }
+ System.out.println("after create backend2");
+ }
+
+ protected boolean checkBEHeartbeat(List<Backend> bes) {
+ return checkBEHeartbeatStatus(bes, true);
+ }
+
+ protected boolean checkBELostHeartbeat(List<Backend> bes) {
+ return checkBEHeartbeatStatus(bes, false);
+ }
+
+ private boolean checkBEHeartbeatStatus(List<Backend> bes, boolean isAlive)
{
+ int maxTry = Config.heartbeat_interval_second + 2;
+ while (maxTry-- > 0) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // no exception
+ }
+ if (bes.stream().allMatch(be -> be.isAlive() == isAlive)) {
+ return true;
+ }
+ }
+
+ return false;
}
protected Backend addNewBackend() throws IOException, InterruptedException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]