This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 6f8b94957ed [improvement](query) prefer to chose tablet on alive disk 
#39467 (#39657)
6f8b94957ed is described below

commit 6f8b94957ed4d4d637144fdc3fbc758c866042c2
Author: yujun <[email protected]>
AuthorDate: Fri Aug 23 12:23:34 2024 +0800

    [improvement](query) prefer to chose tablet on alive disk #39467 (#39657)
    
    cherry pick from #39467
---
 be/src/agent/task_worker_pool.cpp                  | 17 +++--
 be/src/service/doris_main.cpp                      |  2 +
 .../java/org/apache/doris/catalog/DiskInfo.java    |  4 ++
 .../main/java/org/apache/doris/catalog/Tablet.java | 28 +++++---
 .../org/apache/doris/master/ReportHandler.java     | 29 +++++++-
 .../org/apache/doris/planner/OlapScanNode.java     | 14 +++-
 .../main/java/org/apache/doris/system/Backend.java |  5 ++
 .../org/apache/doris/catalog/QueryTabletTest.java  | 84 ++++++++++++++++++++++
 .../java/org/apache/doris/catalog/TabletTest.java  | 13 ++++
 .../doris/cluster/DecommissionBackendTest.java     |  9 ++-
 .../apache/doris/utframe/MockedBackendFactory.java | 31 ++++++++
 .../apache/doris/utframe/TestWithFeService.java    | 56 ++++++++++-----
 12 files changed, 249 insertions(+), 43 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 0b525354da9..35fb7702b98 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -693,10 +693,6 @@ void 
TaskWorkerPool::_report_disk_state_worker_thread_callback() {
         }
 
         _is_doing_work = true;
-        // Random sleep 1~5 seconds before doing report.
-        // In order to avoid the problem that the FE receives many report 
requests at the same time
-        // and can not be processed.
-        _random_sleep(5);
 
         TReportRequest request;
         request.__set_backend(BackendOptions::get_local_backend());
@@ -759,9 +755,16 @@ void 
TaskWorkerPool::_report_tablet_worker_thread_callback() {
         request.__set_backend(BackendOptions::get_local_backend());
         request.__isset.tablets = true;
 
-        uint64_t report_version = _s_report_version;
-        
StorageEngine::instance()->tablet_manager()->build_all_report_tablets_info(
-                &request.tablets);
+        uint64_t report_version;
+        for (int i = 0; i < 5; i++) {
+            request.tablets.clear();
+            report_version = _s_report_version;
+            
StorageEngine::instance()->tablet_manager()->build_all_report_tablets_info(
+                    &request.tablets);
+            if (report_version == _s_report_version) {
+                break;
+            }
+        }
         if (report_version < _s_report_version) {
             // TODO llj This can only reduce the possibility for report error, 
but can't avoid it.
             // If FE create a tablet in FE meta and send CREATE task to this 
BE, the tablet may not be included in this
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 19259b5e3da..3b6cf2969cf 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -545,6 +545,8 @@ int main(int argc, char** argv) {
                                   status.to_string());
     }
 
+    exec_env->storage_engine()->notify_listeners();
+
     while (!doris::k_doris_exit) {
 #if defined(LEAK_SANITIZER)
         __lsan_do_leak_check();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
index b49acb2ff83..666f6147e83 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
@@ -151,6 +151,10 @@ public class DiskInfo implements Writable {
         return pathHash != 0;
     }
 
+    public boolean isAlive() {
+        return state == DiskState.ONLINE;
+    }
+
     public boolean isStorageMediumMatch(TStorageMedium storageMedium) {
         return this.storageMedium == storageMedium;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index 8baac8bd71d..a7240895029 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -240,9 +240,11 @@ public class Tablet extends MetaObject implements Writable 
{
     }
 
     // for query
-    public List<Replica> getQueryableReplicas(long visibleVersion, boolean 
allowFailedVersion) {
+    public List<Replica> getQueryableReplicas(long visibleVersion, Map<Long, 
Set<Long>> backendAlivePathHashs,
+            boolean allowFailedVersion) {
         List<Replica> allQueryableReplica = 
Lists.newArrayListWithCapacity(replicas.size());
         List<Replica> auxiliaryReplica = 
Lists.newArrayListWithCapacity(replicas.size());
+        List<Replica> deadPathReplica = Lists.newArrayList();
         for (Replica replica : replicas) {
             if (replica.isBad()) {
                 continue;
@@ -253,21 +255,31 @@ public class Tablet extends MetaObject implements 
Writable {
                 continue;
             }
 
+            if (!replica.checkVersionCatchUp(visibleVersion, false)) {
+                continue;
+            }
+
+            Set<Long> thisBeAlivePaths = 
backendAlivePathHashs.get(replica.getBackendId());
             ReplicaState state = replica.getState();
-            if (state.canQuery()) {
-                if (replica.checkVersionCatchUp(visibleVersion, false)) {
-                    allQueryableReplica.add(replica);
-                }
+            // if thisBeAlivePaths contains pathHash = 0, it mean this be 
hadn't report disks state.
+            // should ignore this case.
+            if (replica.getPathHash() != -1 && thisBeAlivePaths != null
+                    && !thisBeAlivePaths.contains(replica.getPathHash())
+                    && !thisBeAlivePaths.contains(0L)) {
+                deadPathReplica.add(replica);
+            } else if (state.canQuery()) {
+                allQueryableReplica.add(replica);
             } else if (state == ReplicaState.DECOMMISSION) {
-                if (replica.checkVersionCatchUp(visibleVersion, false)) {
-                    auxiliaryReplica.add(replica);
-                }
+                auxiliaryReplica.add(replica);
             }
         }
 
         if (allQueryableReplica.isEmpty()) {
             allQueryableReplica = auxiliaryReplica;
         }
+        if (allQueryableReplica.isEmpty()) {
+            allQueryableReplica = deadPathReplica;
+        }
 
         if (Config.skip_compaction_slower_replica && 
allQueryableReplica.size() > 1) {
             long minVersionCount = Long.MAX_VALUE;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 13215503f66..125f2553ec9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -21,6 +21,7 @@ package org.apache.doris.master;
 import org.apache.doris.catalog.BinlogConfig;
 import org.apache.doris.catalog.ColocateTableIndex;
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.MaterializedIndex.IndexState;
@@ -723,6 +724,15 @@ public class ReportHandler extends Daemon {
         AgentBatchTask createReplicaBatchTask = new AgentBatchTask();
         TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
         Map<Object, Object> objectPool = new HashMap<Object, Object>();
+        Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
+        Set<Long> backendHealthPathHashs;
+        if (backend == null) {
+            backendHealthPathHashs = Sets.newHashSet();
+        } else {
+            backendHealthPathHashs = backend.getDisks().values().stream()
+                    .filter(DiskInfo::isAlive)
+                    .map(DiskInfo::getPathHash).collect(Collectors.toSet());
+        }
         for (Long dbId : tabletDeleteFromMeta.keySet()) {
             Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
             if (db == null) {
@@ -782,7 +792,24 @@ public class ReportHandler extends Daemon {
                     long currentBackendReportVersion = 
Env.getCurrentSystemInfo()
                             .getBackendReportVersion(backendId);
                     if (backendReportVersion < currentBackendReportVersion) {
-                        continue;
+
+                        // if backendHealthPathHashs contains health path hash 
0,
+                        // it means this backend hadn't reported disks state,
+                        // should ignore this case.
+                        boolean thisReplicaOnBadDisk = replica.getPathHash() 
!= -1L
+                                && 
!backendHealthPathHashs.contains(replica.getPathHash())
+                                && !backendHealthPathHashs.contains(0L);
+
+                        boolean existsOtherHealthReplica = 
tablet.getReplicas().stream()
+                                .anyMatch(r -> r.getBackendId() != 
replica.getBackendId()
+                                        && r.getVersion() >= 
replica.getVersion()
+                                        && r.getLastFailedVersion() == -1L
+                                        && !r.isBad());
+
+                        // if replica is on bad disks and there are other 
health replicas, still delete it.
+                        if (!(thisReplicaOnBadDisk && 
existsOtherHealthReplica)) {
+                            continue;
+                        }
                     }
 
                     if (state == ReplicaState.NORMAL || state == 
ReplicaState.SCHEMA_CHANGE) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 0509eabeb65..6342e8bdb50 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -41,6 +41,7 @@ import org.apache.doris.analysis.TupleId;
 import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.ColocateTableIndex;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.DistributionInfo;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HashDistributionInfo;
@@ -721,7 +722,7 @@ public class OlapScanNode extends ScanNode {
     }
 
     private void addScanRangeLocations(Partition partition,
-            List<Tablet> tablets) throws UserException {
+            List<Tablet> tablets, Map<Long, Set<Long>> backendAlivePathHashs) 
throws UserException {
         long visibleVersion = partition.getVisibleVersion();
         String visibleVersionStr = String.valueOf(visibleVersion);
 
@@ -765,7 +766,8 @@ public class OlapScanNode extends ScanNode {
             paloRange.setTabletId(tabletId);
 
             // random shuffle List && only collect one copy
-            List<Replica> replicas = 
tablet.getQueryableReplicas(visibleVersion, skipMissingVersion);
+            List<Replica> replicas = 
tablet.getQueryableReplicas(visibleVersion,
+                    backendAlivePathHashs, skipMissingVersion);
             if (replicas.isEmpty()) {
                 LOG.warn("no queryable replica found in tablet {}. visible 
version {}", tabletId, visibleVersion);
                 StringBuilder sb = new StringBuilder(
@@ -1077,6 +1079,12 @@ public class OlapScanNode extends ScanNode {
          */
         Preconditions.checkState(scanBackendIds.size() == 0);
         Preconditions.checkState(scanTabletIds.size() == 0);
+        Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap();
+        for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) {
+            backendAlivePathHashs.put(backend.getId(), 
backend.getDisks().values().stream()
+                    
.filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet()));
+        }
+
         for (Long partitionId : selectedPartitionIds) {
             final Partition partition = olapTable.getPartition(partitionId);
             final MaterializedIndex selectedTable = 
partition.getIndex(selectedIndexId);
@@ -1114,7 +1122,7 @@ public class OlapScanNode extends ScanNode {
 
             totalTabletsNum += selectedTable.getTablets().size();
             selectedTabletsNum += tablets.size();
-            addScanRangeLocations(partition, tablets);
+            addScanRangeLocations(partition, tablets, backendAlivePathHashs);
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index 99cd99dca06..c41a70d60ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -172,6 +172,11 @@ public class Backend implements Writable {
         return id;
     }
 
+    // Return ip:heartbeat port
+    public String getAddress() {
+        return host + ":" + heartbeatPort;
+    }
+
     public String getHost() {
         return host;
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java
new file mode 100644
index 00000000000..b80544245da
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.system.Backend;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class QueryTabletTest extends TestWithFeService {
+
+    @Override
+    protected int backendNum() {
+        return 3;
+    }
+
+    @Test
+    public void testTabletOnBadDisks() throws Exception {
+        createDatabase("db1");
+        createTable("create table db1.tbl1(k1 int) distributed by hash(k1) 
buckets 1"
+                + " properties('replication_num' = '3')");
+
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:db1");
+        OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1");
+        Assertions.assertNotNull(tbl);
+        Tablet tablet = tbl.getPartitions().iterator().next()
+                
.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next()
+                .getTablets().iterator().next();
+
+        List<Replica> replicas = tablet.getReplicas();
+        Assertions.assertEquals(3, replicas.size());
+        for (Replica replica : replicas) {
+            Assertions.assertTrue(replica.getPathHash() != -1L);
+        }
+
+        Assertions.assertEquals(replicas,
+                tablet.getQueryableReplicas(1L, getAlivePathHashs(), false));
+
+        // disk mark as bad
+        Env.getCurrentSystemInfo().getBackend(replicas.get(0).getBackendId())
+                .getDisks().values().forEach(disk -> 
disk.setState(DiskInfo.DiskState.OFFLINE));
+
+        // lost disk
+        replicas.get(1).setPathHash(-123321L);
+
+        Assertions.assertEquals(Lists.newArrayList(replicas.get(2)),
+                tablet.getQueryableReplicas(1L, getAlivePathHashs(), false));
+    }
+
+    private Map<Long, Set<Long>> getAlivePathHashs() {
+        Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap();
+        for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) {
+            backendAlivePathHashs.put(backend.getId(), 
backend.getDisks().values().stream()
+                    
.filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet()));
+        }
+
+        return backendAlivePathHashs;
+    }
+
+}
+
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
index d7fdb2694a8..99769a6b525 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
@@ -20,6 +20,8 @@ package org.apache.doris.catalog;
 import org.apache.doris.catalog.Replica.ReplicaState;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TStorageMedium;
 
 import com.google.common.collect.Sets;
@@ -42,6 +44,7 @@ public class TabletTest {
     private Replica replica3;
 
     private TabletInvertedIndex invertedIndex;
+    private SystemInfoService  infoService;
 
     @Mocked
     private Env env;
@@ -49,6 +52,12 @@ public class TabletTest {
     @Before
     public void makeTablet() {
         invertedIndex = new TabletInvertedIndex();
+        infoService = new SystemInfoService();
+        for (long beId = 1L; beId <= 4L; beId++) {
+            Backend be = new Backend(beId, "127.0.0." + beId, 8030);
+            be.setAlive(true);
+            infoService.addBackend(be);
+        }
         new Expectations(env) {
             {
                 Env.getCurrentEnvJournalVersion();
@@ -59,6 +68,10 @@ public class TabletTest {
                 minTimes = 0;
                 result = invertedIndex;
 
+                Env.getCurrentSystemInfo();
+                minTimes = 0;
+                result = infoService;
+
                 Env.isCheckpointThread();
                 minTimes = 0;
                 result = false;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
index ff5f5292be8..e67cbccb668 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
@@ -97,8 +97,8 @@ public class DecommissionBackendTest extends 
TestWithFeService {
             }
         }
 
-        Assertions.assertTrue(srcBackend != null);
-        String decommissionStmtStr = "alter system decommission backend 
\"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\"";
+        Assertions.assertNotNull(srcBackend);
+        String decommissionStmtStr = "alter system decommission backend \"" + 
srcBackend.getAddress() + "\"";
         AlterSystemStmt decommissionStmt = (AlterSystemStmt) 
parseAndAnalyzeStmt(decommissionStmtStr);
         
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
 
@@ -163,7 +163,7 @@ public class DecommissionBackendTest extends 
TestWithFeService {
         dropTable("db2.tbl1", false);
 
         // 6. execute decommission
-        String decommissionStmtStr = "alter system decommission backend 
\"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\"";
+        String decommissionStmtStr = "alter system decommission backend \"" + 
srcBackend.getAddress() + "\"";
         AlterSystemStmt decommissionStmt = (AlterSystemStmt) 
parseAndAnalyzeStmt(decommissionStmtStr);
         
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
         Assertions.assertEquals(true, srcBackend.isDecommissioned());
@@ -240,8 +240,7 @@ public class DecommissionBackendTest extends 
TestWithFeService {
         // 4. query tablet num
         int tabletNum = 
Env.getCurrentInvertedIndex().getTabletMetaMap().size();
 
-        String decommissionStmtStr = "alter system decommission backend 
\"127.0.0.1:"
-                + srcBackend.getHeartbeatPort() + "\"";
+        String decommissionStmtStr = "alter system decommission backend \"" + 
srcBackend.getAddress() + "\"";
         AlterSystemStmt decommissionStmt = (AlterSystemStmt) 
parseAndAnalyzeStmt(decommissionStmtStr);
         
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 665dc8163ae..8feff517c79 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -39,6 +39,7 @@ import org.apache.doris.thrift.TCancelPlanFragmentParams;
 import org.apache.doris.thrift.TCancelPlanFragmentResult;
 import org.apache.doris.thrift.TCheckStorageFormatResult;
 import org.apache.doris.thrift.TCloneReq;
+import org.apache.doris.thrift.TCreateTabletReq;
 import org.apache.doris.thrift.TDiskTrashInfo;
 import org.apache.doris.thrift.TDropTabletReq;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
@@ -81,7 +82,9 @@ import org.apache.thrift.TException;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
 
 /*
  * This class is used to create mock backends.
@@ -184,6 +187,9 @@ public class MockedBackendFactory {
                             TTaskType taskType = request.getTaskType();
                             switch (taskType) {
                                 case CREATE:
+                                    ++reportVersion;
+                                    handleCreateTablet(request, 
finishTaskRequest);
+                                    break;
                                 case ALTER:
                                     ++reportVersion;
                                     break;
@@ -191,6 +197,7 @@ public class MockedBackendFactory {
                                     handleDropTablet(request, 
finishTaskRequest);
                                     break;
                                 case CLONE:
+                                    ++reportVersion;
                                     handleCloneTablet(request, 
finishTaskRequest);
                                     break;
                                 case STORAGE_MEDIUM_MIGRATE:
@@ -216,6 +223,30 @@ public class MockedBackendFactory {
                     }
                 }
 
+                private void handleCreateTablet(TAgentTaskRequest request, 
TFinishTaskRequest finishTaskRequest) {
+                    TCreateTabletReq req = request.getCreateTabletReq();
+                    List<DiskInfo> candDisks = 
backendInFe.getDisks().values().stream()
+                            .filter(disk -> req.storage_medium == 
disk.getStorageMedium() && disk.isAlive())
+                            .collect(Collectors.toList());
+                    if (candDisks.isEmpty()) {
+                        candDisks = backendInFe.getDisks().values().stream()
+                                .filter(DiskInfo::isAlive)
+                                .collect(Collectors.toList());
+                    }
+                    DiskInfo choseDisk = candDisks.isEmpty() ? null
+                            : candDisks.get(new 
Random().nextInt(candDisks.size()));
+
+                    List<TTabletInfo> tabletInfos = Lists.newArrayList();
+                    TTabletInfo tabletInfo = new TTabletInfo();
+                    tabletInfo.setTabletId(req.tablet_id);
+                    tabletInfo.setVersion(req.version);
+                    tabletInfo.setPathHash(choseDisk == null ? -1L : 
choseDisk.getPathHash());
+                    tabletInfo.setReplicaId(req.replica_id);
+                    tabletInfo.setUsed(true);
+                    tabletInfos.add(tabletInfo);
+                    finishTaskRequest.setFinishTabletInfos(tabletInfos);
+                }
+
                 private void handleDropTablet(TAgentTaskRequest request, 
TFinishTaskRequest finishTaskRequest) {
                     TDropTabletReq req = request.getDropTabletReq();
                     long dataSize = Math.max(1, 
CatalogTestUtil.getTabletDataSize(req.tablet_id));
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 59e4eae0a37..db77fcd07ec 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -386,7 +386,7 @@ public abstract class TestWithFeService {
     protected void createDorisCluster()
             throws InterruptedException, NotInitException, IOException, 
DdlException, EnvVarNotSetException,
             FeStartException {
-        createDorisCluster(runningDir, backendNum());
+        createDorisClusterWithMultiTag(runningDir, backendNum());
     }
 
     protected void createDorisCluster(String runningDir, int backendNum)
@@ -399,26 +399,13 @@ public abstract class TestWithFeService {
             bes.add(createBackend("127.0.0.1", feRpcPort));
         }
         System.out.println("after create backend");
-        checkBEHeartbeat(bes);
+        if (!checkBEHeartbeat(bes)) {
+            System.out.println("Some backends dead, all backends: " + bes);
+        }
         // Thread.sleep(2000);
         System.out.println("after create backend2");
     }
 
-    private void checkBEHeartbeat(List<Backend> bes) throws 
InterruptedException {
-        int maxTry = Config.heartbeat_interval_second + 5;
-        boolean allAlive = false;
-        while (maxTry-- > 0 && !allAlive) {
-            Thread.sleep(1000);
-            boolean hasDead = false;
-            for (Backend be : bes) {
-                if (!be.isAlive()) {
-                    hasDead = true;
-                }
-            }
-            allAlive = !hasDead;
-        }
-    }
-
     // Create multi backends with different host for unit test.
     // the host of BE will be "127.0.0.1", "127.0.0.2"
     protected void createDorisClusterWithMultiTag(String runningDir, int 
backendNum)
@@ -426,14 +413,45 @@ public abstract class TestWithFeService {
             InterruptedException {
         // set runningUnitTest to true, so that for ut, the agent task will be 
send to "127.0.0.1"
         // to make cluster running well.
-        FeConstants.runningUnitTest = true;
+        if (backendNum > 1) {
+            FeConstants.runningUnitTest = true;
+        }
         int feRpcPort = startFEServer(runningDir);
         List<Backend> bes = Lists.newArrayList();
+        System.out.println("start create backend, backend num " + backendNum);
         for (int i = 0; i < backendNum; i++) {
             String host = "127.0.0." + (i + 1);
             bes.add(createBackend(host, feRpcPort));
         }
-        checkBEHeartbeat(bes);
+        System.out.println("after create backend");
+        if (!checkBEHeartbeat(bes)) {
+            System.out.println("Some backends dead, all backends: " + bes);
+        }
+        System.out.println("after create backend2");
+    }
+
+    protected boolean checkBEHeartbeat(List<Backend> bes) {
+        return checkBEHeartbeatStatus(bes, true);
+    }
+
+    protected boolean checkBELostHeartbeat(List<Backend> bes) {
+        return checkBEHeartbeatStatus(bes, false);
+    }
+
+    private boolean checkBEHeartbeatStatus(List<Backend> bes, boolean isAlive) 
{
+        int maxTry = Config.heartbeat_interval_second + 2;
+        while (maxTry-- > 0) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                // no exception
+            }
+            if (bes.stream().allMatch(be -> be.isAlive() == isAlive)) {
+                return true;
+            }
+        }
+
+        return false;
     }
 
     protected Backend addNewBackend() throws IOException, InterruptedException 
{


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to