This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5280c18100a [improvement](query) prefer to chose tablet on alive disk
(#39467)
5280c18100a is described below
commit 5280c18100afac7e531a9443c03319a9df389ead
Author: yujun <[email protected]>
AuthorDate: Tue Aug 20 23:01:53 2024 +0800
[improvement](query) prefer to chose tablet on alive disk (#39467)
improvement:
1. when query, prefer to chose tablets on alive disks;
2. when be report tablets, if report version fall behind, try report
again;
3. when be restart, it report its tablets and disks immedidately, no
wait 1min;
4. when fe handle tablet report, even if this report is stale, but if
there exists other health tablets and this tablet is on bad disk, still
process this tablet;
---
be/src/agent/task_worker_pool.cpp | 19 ++---
be/src/service/doris_main.cpp | 2 +
.../java/org/apache/doris/catalog/DiskInfo.java | 4 ++
.../main/java/org/apache/doris/catalog/Tablet.java | 28 +++++---
.../org/apache/doris/master/ReportHandler.java | 29 +++++++-
.../org/apache/doris/planner/OlapScanNode.java | 14 +++-
.../org/apache/doris/system/SystemInfoService.java | 2 +-
.../org/apache/doris/catalog/QueryTabletTest.java | 84 ++++++++++++++++++++++
.../apache/doris/utframe/MockedBackendFactory.java | 31 ++++++++
9 files changed, 191 insertions(+), 22 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index a02f1761463..27921888774 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1009,13 +1009,6 @@ void report_task_callback(const TMasterInfo&
master_info) {
}
void report_disk_callback(StorageEngine& engine, const TMasterInfo&
master_info) {
- // Random sleep 1~5 seconds before doing report.
- // In order to avoid the problem that the FE receives many report requests
at the same time
- // and can not be processed.
- if (config::report_random_wait) {
- random_sleep(5);
- }
-
TReportRequest request;
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.disks = true;
@@ -1081,8 +1074,16 @@ void report_tablet_callback(StorageEngine& engine, const
TMasterInfo& master_inf
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.tablets = true;
- uint64_t report_version = s_report_version;
- engine.tablet_manager()->build_all_report_tablets_info(&request.tablets);
+ uint64_t report_version;
+ for (int i = 0; i < 5; i++) {
+ request.tablets.clear();
+ report_version = s_report_version;
+
engine.tablet_manager()->build_all_report_tablets_info(&request.tablets);
+ if (report_version == s_report_version) {
+ break;
+ }
+ }
+
if (report_version < s_report_version) {
// TODO llj This can only reduce the possibility for report error, but
can't avoid it.
// If FE create a tablet in FE meta and send CREATE task to this BE,
the tablet may not be included in this
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 92d3452dcb1..dcc76259868 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -602,6 +602,8 @@ int main(int argc, char** argv) {
stop_work_if_error(
status, "Arrow Flight Service did not start correctly, exiting, "
+ status.to_string());
+ exec_env->storage_engine().notify_listeners();
+
while (!doris::k_doris_exit) {
#if defined(LEAK_SANITIZER)
__lsan_do_leak_check();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
index 934e7f75fb0..38d8037befc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
@@ -151,6 +151,10 @@ public class DiskInfo implements Writable {
return pathHash != 0;
}
+ public boolean isAlive() {
+ return state == DiskState.ONLINE;
+ }
+
public boolean isStorageMediumMatch(TStorageMedium storageMedium) {
return this.storageMedium == storageMedium;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index 68aa70a4039..4102f4f117e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -304,9 +304,11 @@ public class Tablet extends MetaObject {
}
// for query
- public List<Replica> getQueryableReplicas(long visibleVersion, boolean
allowFailedVersion) {
+ public List<Replica> getQueryableReplicas(long visibleVersion, Map<Long,
Set<Long>> backendAlivePathHashs,
+ boolean allowFailedVersion) {
List<Replica> allQueryableReplica =
Lists.newArrayListWithCapacity(replicas.size());
List<Replica> auxiliaryReplica =
Lists.newArrayListWithCapacity(replicas.size());
+ List<Replica> deadPathReplica = Lists.newArrayList();
for (Replica replica : replicas) {
if (replica.isBad()) {
continue;
@@ -317,21 +319,31 @@ public class Tablet extends MetaObject {
continue;
}
+ if (!replica.checkVersionCatchUp(visibleVersion, false)) {
+ continue;
+ }
+
+ Set<Long> thisBeAlivePaths =
backendAlivePathHashs.get(replica.getBackendId());
ReplicaState state = replica.getState();
- if (state.canQuery()) {
- if (replica.checkVersionCatchUp(visibleVersion, false)) {
- allQueryableReplica.add(replica);
- }
+ // if thisBeAlivePaths contains pathHash = 0, it mean this be
hadn't report disks state.
+ // should ignore this case.
+ if (replica.getPathHash() != -1 && thisBeAlivePaths != null
+ && !thisBeAlivePaths.contains(replica.getPathHash())
+ && !thisBeAlivePaths.contains(0L)) {
+ deadPathReplica.add(replica);
+ } else if (state.canQuery()) {
+ allQueryableReplica.add(replica);
} else if (state == ReplicaState.DECOMMISSION) {
- if (replica.checkVersionCatchUp(visibleVersion, false)) {
- auxiliaryReplica.add(replica);
- }
+ auxiliaryReplica.add(replica);
}
}
if (allQueryableReplica.isEmpty()) {
allQueryableReplica = auxiliaryReplica;
}
+ if (allQueryableReplica.isEmpty()) {
+ allQueryableReplica = deadPathReplica;
+ }
if (Config.skip_compaction_slower_replica &&
allQueryableReplica.size() > 1) {
long minVersionCount =
allQueryableReplica.stream().mapToLong(Replica::getVisibleVersionCount)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 89bc9a6e522..06b560ab362 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.ColocateGroupSchema;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.MaterializedIndex;
@@ -822,6 +823,15 @@ public class ReportHandler extends Daemon {
AgentBatchTask createReplicaBatchTask = new AgentBatchTask();
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
Map<Object, Object> objectPool = new HashMap<Object, Object>();
+ Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
+ Set<Long> backendHealthPathHashs;
+ if (backend == null) {
+ backendHealthPathHashs = Sets.newHashSet();
+ } else {
+ backendHealthPathHashs = backend.getDisks().values().stream()
+ .filter(DiskInfo::isAlive)
+ .map(DiskInfo::getPathHash).collect(Collectors.toSet());
+ }
for (Long dbId : tabletDeleteFromMeta.keySet()) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
@@ -877,7 +887,24 @@ public class ReportHandler extends Daemon {
long currentBackendReportVersion =
Env.getCurrentSystemInfo()
.getBackendReportVersion(backendId);
if (backendReportVersion < currentBackendReportVersion) {
- continue;
+
+ // if backendHealthPathHashs contains health path hash
0,
+ // it means this backend hadn't reported disks state,
+ // should ignore this case.
+ boolean thisReplicaOnBadDisk = replica.getPathHash()
!= -1L
+ &&
!backendHealthPathHashs.contains(replica.getPathHash())
+ && !backendHealthPathHashs.contains(0L);
+
+ boolean existsOtherHealthReplica =
tablet.getReplicas().stream()
+ .anyMatch(r -> r.getBackendId() !=
replica.getBackendId()
+ && r.getVersion() >=
replica.getVersion()
+ && r.getLastFailedVersion() == -1L
+ && !r.isBad());
+
+ // if replica is on bad disks and there are other
health replicas, still delete it.
+ if (!(thisReplicaOnBadDisk &&
existsOtherHealthReplica)) {
+ continue;
+ }
}
BinlogConfig binlogConfig = new
BinlogConfig(olapTable.getBinlogConfig());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index dffbba37cfe..8e5ab5cdf0a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -40,6 +40,7 @@ import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
@@ -751,7 +752,7 @@ public class OlapScanNode extends ScanNode {
}
private void addScanRangeLocations(Partition partition,
- List<Tablet> tablets) throws UserException {
+ List<Tablet> tablets, Map<Long, Set<Long>> backendAlivePathHashs)
throws UserException {
long visibleVersion = Partition.PARTITION_INIT_VERSION;
// For cloud mode, set scan range visible version in Coordinator.exec
so that we could
@@ -804,7 +805,8 @@ public class OlapScanNode extends ScanNode {
//
// ATTN: visibleVersion is not used in cloud mode, see
CloudReplica.checkVersionCatchup
// for details.
- List<Replica> replicas =
tablet.getQueryableReplicas(visibleVersion, skipMissingVersion);
+ List<Replica> replicas =
tablet.getQueryableReplicas(visibleVersion,
+ backendAlivePathHashs, skipMissingVersion);
if (replicas.isEmpty()) {
if (ConnectContext.get().getSessionVariable().skipBadTablet) {
continue;
@@ -1168,6 +1170,12 @@ public class OlapScanNode extends ScanNode {
*/
Preconditions.checkState(scanBackendIds.size() == 0);
Preconditions.checkState(scanTabletIds.size() == 0);
+ Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap();
+ for (Backend backend :
Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values()) {
+ backendAlivePathHashs.put(backend.getId(),
backend.getDisks().values().stream()
+
.filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet()));
+ }
+
for (Long partitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(partitionId);
final MaterializedIndex selectedTable =
partition.getIndex(selectedIndexId);
@@ -1209,7 +1217,7 @@ public class OlapScanNode extends ScanNode {
totalTabletsNum += selectedTable.getTablets().size();
selectedSplitNum += tablets.size();
- addScanRangeLocations(partition, tablets);
+ addScanRangeLocations(partition, tablets, backendAlivePathHashs);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 836d516c942..f81d8b4d7b0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -799,7 +799,7 @@ public class SystemInfoService {
}
}
- private ImmutableMap<Long, Backend> getAllClusterBackendsNoException() {
+ public ImmutableMap<Long, Backend> getAllClusterBackendsNoException() {
try {
return getAllBackendsByAllCluster();
} catch (AnalysisException e) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java
new file mode 100644
index 00000000000..32929523a53
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.system.Backend;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class QueryTabletTest extends TestWithFeService {
+
+ @Override
+ protected int backendNum() {
+ return 3;
+ }
+
+ @Test
+ public void testTabletOnBadDisks() throws Exception {
+ createDatabase("db1");
+ createTable("create table db1.tbl1(k1 int) distributed by hash(k1)
buckets 1"
+ + " properties('replication_num' = '3')");
+
+ Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException("db1");
+ OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1");
+ Assertions.assertNotNull(tbl);
+ Tablet tablet = tbl.getPartitions().iterator().next()
+
.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next()
+ .getTablets().iterator().next();
+
+ List<Replica> replicas = tablet.getReplicas();
+ Assertions.assertEquals(3, replicas.size());
+ for (Replica replica : replicas) {
+ Assertions.assertTrue(replica.getPathHash() != -1L);
+ }
+
+ Assertions.assertEquals(replicas,
+ tablet.getQueryableReplicas(1L, getAlivePathHashs(), false));
+
+ // disk mark as bad
+ Env.getCurrentSystemInfo().getBackend(replicas.get(0).getBackendId())
+ .getDisks().values().forEach(disk ->
disk.setState(DiskInfo.DiskState.OFFLINE));
+
+ // lost disk
+ replicas.get(1).setPathHash(-123321L);
+
+ Assertions.assertEquals(Lists.newArrayList(replicas.get(2)),
+ tablet.getQueryableReplicas(1L, getAlivePathHashs(), false));
+ }
+
+ private Map<Long, Set<Long>> getAlivePathHashs() {
+ Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap();
+ for (Backend backend :
Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values()) {
+ backendAlivePathHashs.put(backend.getId(),
backend.getDisks().values().stream()
+
.filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet()));
+ }
+
+ return backendAlivePathHashs;
+ }
+
+}
+
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 3934e140f67..9e8ff913ada 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -41,6 +41,7 @@ import org.apache.doris.thrift.TCheckStorageFormatResult;
import org.apache.doris.thrift.TCheckWarmUpCacheAsyncRequest;
import org.apache.doris.thrift.TCheckWarmUpCacheAsyncResponse;
import org.apache.doris.thrift.TCloneReq;
+import org.apache.doris.thrift.TCreateTabletReq;
import org.apache.doris.thrift.TDiskTrashInfo;
import org.apache.doris.thrift.TDropTabletReq;
import org.apache.doris.thrift.TExecPlanFragmentParams;
@@ -95,7 +96,9 @@ import org.apache.thrift.TException;
import java.io.IOException;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
/*
* This class is used to create mock backends.
@@ -203,6 +206,9 @@ public class MockedBackendFactory {
TTaskType taskType = request.getTaskType();
switch (taskType) {
case CREATE:
+ ++reportVersion;
+ handleCreateTablet(request,
finishTaskRequest);
+ break;
case ALTER:
++reportVersion;
break;
@@ -210,6 +216,7 @@ public class MockedBackendFactory {
handleDropTablet(request,
finishTaskRequest);
break;
case CLONE:
+ ++reportVersion;
handleCloneTablet(request,
finishTaskRequest);
break;
case STORAGE_MEDIUM_MIGRATE:
@@ -235,6 +242,30 @@ public class MockedBackendFactory {
}
}
+ private void handleCreateTablet(TAgentTaskRequest request,
TFinishTaskRequest finishTaskRequest) {
+ TCreateTabletReq req = request.getCreateTabletReq();
+ List<DiskInfo> candDisks =
backendInFe.getDisks().values().stream()
+ .filter(disk -> req.storage_medium ==
disk.getStorageMedium() && disk.isAlive())
+ .collect(Collectors.toList());
+ if (candDisks.isEmpty()) {
+ candDisks = backendInFe.getDisks().values().stream()
+ .filter(DiskInfo::isAlive)
+ .collect(Collectors.toList());
+ }
+ DiskInfo choseDisk = candDisks.isEmpty() ? null
+ : candDisks.get(new
Random().nextInt(candDisks.size()));
+
+ List<TTabletInfo> tabletInfos = Lists.newArrayList();
+ TTabletInfo tabletInfo = new TTabletInfo();
+ tabletInfo.setTabletId(req.tablet_id);
+ tabletInfo.setVersion(req.version);
+ tabletInfo.setPathHash(choseDisk == null ? -1L :
choseDisk.getPathHash());
+ tabletInfo.setReplicaId(req.replica_id);
+ tabletInfo.setUsed(true);
+ tabletInfos.add(tabletInfo);
+ finishTaskRequest.setFinishTabletInfos(tabletInfos);
+ }
+
private void handleDropTablet(TAgentTaskRequest request,
TFinishTaskRequest finishTaskRequest) {
TDropTabletReq req = request.getDropTabletReq();
long dataSize = Math.max(1,
CatalogTestUtil.getTabletDataSize(req.tablet_id));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]