This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 93ccef4 [Feature] Add degradate strategy for local_replica_selection.
(#7064)
93ccef4 is described below
commit 93ccef4ec764e9ba153947d1657006ac65907efa
Author: 曹建华 <[email protected]>
AuthorDate: Sat Nov 13 17:09:25 2021 +0800
[Feature] Add degradate strategy for local_replica_selection. (#7064)
When local_replica_selection is turned on, support select a non-local BE to
service the query
when the local be is unavailable
---
docs/en/administrator-guide/config/fe_config.md | 10 ++++++-
docs/zh-CN/administrator-guide/config/fe_config.md | 10 ++++++-
.../main/java/org/apache/doris/catalog/Tablet.java | 7 ++---
.../main/java/org/apache/doris/common/Config.java | 14 ++++++++--
.../org/apache/doris/planner/OlapScanNode.java | 24 ++++------------
.../main/java/org/apache/doris/qe/Coordinator.java | 32 ++++++++++++++++++++--
6 files changed, 67 insertions(+), 30 deletions(-)
diff --git a/docs/en/administrator-guide/config/fe_config.md
b/docs/en/administrator-guide/config/fe_config.md
index 2852e88..85f2249 100644
--- a/docs/en/administrator-guide/config/fe_config.md
+++ b/docs/en/administrator-guide/config/fe_config.md
@@ -783,7 +783,15 @@ If set to true, Planner will try to select replica of
tablet on same host as thi
- N hosts with N Backends and N Frontends deployed.
- The data has N replicas.
- High concurrency queries are syyuyuient to all Frontends evenly
-- In this case, all Frontends can only use local replicas to do the query.
+- In this case, all Frontends can only use local replicas to do the query. If
you want to allow fallback to nonlocal replicas when no local replicas
available, set enable_local_replica_selection_fallback to true.
+
+### enable_local_replica_selection_fallback
+
+Default:false
+
+IsMutable:true
+
+Used with enable_local_replica_selection. If the local replicas is not
available, fallback to the nonlocal replicas.
### max_unfinished_load_job
diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md
b/docs/zh-CN/administrator-guide/config/fe_config.md
index d67952c..9c0fe26 100644
--- a/docs/zh-CN/administrator-guide/config/fe_config.md
+++ b/docs/zh-CN/administrator-guide/config/fe_config.md
@@ -775,7 +775,15 @@ fe 会在每隔 es_state_sync_interval_secs 调用 es api 获取 es 索引分片
3. 高并发查询均匀发送到所有 Frontends
-在这种情况下,所有 Frontends 只能使用本地副本进行查询。
+在这种情况下,所有 Frontends 只能使用本地副本进行查询。如果想当本地副本不可用时,使用非本地副本服务查询,请将
enable_local_replica_selection_fallback 设置为 true
+
+### enable_local_replica_selection_fallback
+
+默认值:false
+
+是否可以动态配置:true
+
+与 enable_local_replica_selection 配合使用,当本地副本不可用时,使用非本地副本服务查询。
### max_unfinished_load_job
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index f40216c..fe6b7a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -213,8 +213,8 @@ public class Tablet extends MetaObject implements Writable {
}
// for query
- public void getQueryableReplicas(List<Replica> allQuerableReplica,
List<Replica> localReplicas,
- long visibleVersion, long visibleVersionHash, long localBeId, int
schemaHash) {
+ public void getQueryableReplicas(List<Replica> allQuerableReplica, long
visibleVersion,
+ long visibleVersionHash, int schemaHash) {
for (Replica replica : replicas) {
if (replica.isBad()) {
continue;
@@ -231,9 +231,6 @@ public class Tablet extends MetaObject implements Writable {
if (replica.checkVersionCatchUp(visibleVersion,
visibleVersionHash, false)
&& (replica.getSchemaHash() == -1 ||
replica.getSchemaHash() == schemaHash)) {
allQuerableReplica.add(replica);
- if (localBeId != -1 && replica.getBackendId() ==
localBeId) {
- localReplicas.add(replica);
- }
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index cd02ec1..3ab873e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -168,7 +168,7 @@ public class Config extends ConfigBase {
*/
@ConfField
public static int label_clean_interval_second = 1 * 3600; // 1 hours
-
+
// Configurations for meta data durability
/**
* Doris meta data will be saved here.
@@ -1015,11 +1015,21 @@ public class Config extends ConfigBase {
* 2. The data has N replicas.
* 3. High concurrency queries are sent to all Frontends evenly
* In this case, all Frontends can only use local replicas to do the query.
+ * If you want to allow fallback to nonlocal replicas when no local
replicas available,
+ * set enable_local_replica_selection_fallback to true.
*/
@ConfField(mutable = true)
public static boolean enable_local_replica_selection = false;
/**
+ * Used with enable_local_replica_selection.
+ * If the local replicas is not available, fallback to the nonlocal
replicas.
+ * */
+ @ConfField(mutable = true)
+ public static boolean enable_local_replica_selection_fallback = false;
+
+
+ /**
* The timeout of executing async remote fragment.
* In normal case, the async remote fragment will be executed in a short
time. If system are under high load
* condition,try to set this timeout longer.
@@ -1475,7 +1485,7 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = false, masterOnly = true)
public static int partition_in_memory_update_interval_secs = 300;
-
+
@ConfField(masterOnly = true)
public static boolean enable_concurrent_update = false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index d980c28..a1c5728 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -455,8 +455,7 @@ public class OlapScanNode extends ScanNode {
private void addScanRangeLocations(Partition partition,
MaterializedIndex index,
- List<Tablet> tablets,
- long localBeId) throws UserException {
+ List<Tablet> tablets) throws
UserException {
int logNum = 0;
int schemaHash = olapTable.getSchemaHashByIndexId(index.getId());
String schemaHashStr = String.valueOf(schemaHash);
@@ -484,11 +483,9 @@ public class OlapScanNode extends ScanNode {
paloRange.setTabletId(tabletId);
// random shuffle List && only collect one copy
- List<Replica> allQueryableReplicas = Lists.newArrayList();
- List<Replica> localReplicas = Lists.newArrayList();
- tablet.getQueryableReplicas(allQueryableReplicas, localReplicas,
- visibleVersion, visibleVersionHash, localBeId, schemaHash);
- if (allQueryableReplicas.isEmpty()) {
+ List<Replica> replicas = Lists.newArrayList();
+ tablet.getQueryableReplicas(replicas, visibleVersion,
visibleVersionHash, schemaHash);
+ if (replicas.isEmpty()) {
LOG.error("no queryable replica found in tablet {}. visible
version {}-{}",
tabletId, visibleVersion, visibleVersionHash);
if (LOG.isDebugEnabled()) {
@@ -499,13 +496,6 @@ public class OlapScanNode extends ScanNode {
throw new UserException("Failed to get scan range, no
queryable replica found in tablet: " + tabletId);
}
- List<Replica> replicas = null;
- if (!localReplicas.isEmpty()) {
- replicas = localReplicas;
- } else {
- replicas = allQueryableReplicas;
- }
-
Collections.shuffle(replicas);
boolean tabletIsNull = true;
boolean collectedStat = false;
@@ -621,10 +611,6 @@ public class OlapScanNode extends ScanNode {
}
private void computeTabletInfo() throws UserException {
- long localBeId = -1;
- if (Config.enable_local_replica_selection) {
- localBeId =
Catalog.getCurrentSystemInfo().getBackendIdByHost(FrontendOptions.getLocalHostAddress());
- }
/**
* The tablet info could be computed only once.
* So the scanBackendIds should be empty in the beginning.
@@ -655,7 +641,7 @@ public class OlapScanNode extends ScanNode {
totalTabletsNum += selectedTable.getTablets().size();
selectedTabletsNum += tablets.size();
- addScanRangeLocations(partition, selectedTable, tablets,
localBeId);
+ addScanRangeLocations(partition, selectedTable, tablets);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index a091d87..1e69a03 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1411,17 +1411,45 @@ public class Coordinator {
public TScanRangeLocation selectBackendsByRoundRobin(TScanRangeLocations
seqLocation,
HashMap<TNetworkAddress, Long>
assignedBytesPerHost,
Reference<Long> backendIdRef) throws
UserException {
+ if (!Config.enable_local_replica_selection) {
+ return selectBackendsByRoundRobin(seqLocation.getLocations(),
assignedBytesPerHost, backendIdRef);
+ }
+
+ List<TScanRangeLocation> localLocations = new ArrayList<>();
+ List<TScanRangeLocation> nonlocalLocations = new ArrayList<>();
+ long localBeId =
Catalog.getCurrentSystemInfo().getBackendIdByHost(FrontendOptions.getLocalHostAddress());
+ for (final TScanRangeLocation location : seqLocation.getLocations()) {
+ if (location.backend_id == localBeId) {
+ localLocations.add(location);
+ } else {
+ nonlocalLocations.add(location);
+ }
+ }
+
+ try {
+ return selectBackendsByRoundRobin(localLocations,
assignedBytesPerHost, backendIdRef);
+ } catch (UserException ue) {
+ if (!Config.enable_local_replica_selection_fallback) {
+ throw ue;
+ }
+ return selectBackendsByRoundRobin(nonlocalLocations,
assignedBytesPerHost, backendIdRef);
+ }
+ }
+
+ public TScanRangeLocation
selectBackendsByRoundRobin(List<TScanRangeLocation> locations,
+
HashMap<TNetworkAddress, Long> assignedBytesPerHost,
+ Reference<Long>
backendIdRef) throws UserException {
Long minAssignedBytes = Long.MAX_VALUE;
TScanRangeLocation minLocation = null;
Long step = 1L;
- for (final TScanRangeLocation location : seqLocation.getLocations()) {
+ for (final TScanRangeLocation location : locations) {
Long assignedBytes = findOrInsert(assignedBytesPerHost,
location.server, 0L);
if (assignedBytes < minAssignedBytes) {
minAssignedBytes = assignedBytes;
minLocation = location;
}
}
- TScanRangeLocation location = SimpleScheduler.getLocation(minLocation,
seqLocation.locations, this.idToBackend, backendIdRef);
+ TScanRangeLocation location = SimpleScheduler.getLocation(minLocation,
locations, this.idToBackend, backendIdRef);
if (assignedBytesPerHost.containsKey(location.server)) {
assignedBytesPerHost.put(location.server,
assignedBytesPerHost.get(location.server) + step);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]