This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9103d77b33d [opt](blacklist) Backend should not be added to blacklist
easily #41170 (#41957)
9103d77b33d is described below
commit 9103d77b33dc30eab26a2aaf9fe9f61027663f81
Author: zhiqiang <[email protected]>
AuthorDate: Thu Oct 17 11:34:53 2024 +0800
[opt](blacklist) Backend should not be added to blacklist easily #41170
(#41957)
cherry pick from #41170
---
.../main/java/org/apache/doris/common/Config.java | 14 ++
.../java/org/apache/doris/qe/SimpleScheduler.java | 200 +++++++++++++++++++--
.../org/apache/doris/qe/SimpleSchedulerTest.java | 32 +++-
3 files changed, 219 insertions(+), 27 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 1ddfc216068..a33f59b4c40 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1963,6 +1963,20 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = false)
public static boolean disable_backend_black_list = false;
+ @ConfField(mutable = true, masterOnly = false, description = {
+ "If a backend is tried to be added to black list
do_add_backend_black_list_threshold_count times "
+ + "in do_add_backend_black_list_threshold_seconds, it will be
added to black list."})
+ public static long do_add_backend_black_list_threshold_count = 10;
+
+ @ConfField(mutable = true, masterOnly = false, description = {
+ "If a backend is tried to be added to black list
do_add_backend_black_list_threshold_count times "
+ + "in do_add_backend_black_list_threshold_seconds, it will be
added to black list."})
+ public static long do_add_backend_black_list_threshold_seconds = 30;
+
+ @ConfField(mutable = true, masterOnly = false, description = {
+ "Backend will stay in black list for this time after it is added to
black list."})
+ public static long stay_in_backend_black_list_threshold_seconds = 60;
+
/**
* Maximum backend heartbeat failure tolerance count.
* Default is 1, which means if 1 heart failed, the backend will be marked
as dead.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
index f8d30f08b9b..131815cfe20 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
@@ -19,7 +19,6 @@ package org.apache.doris.qe;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
-import org.apache.doris.common.Pair;
import org.apache.doris.common.Reference;
import org.apache.doris.common.UserException;
import org.apache.doris.system.Backend;
@@ -27,6 +26,7 @@ import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TScanRangeLocation;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -34,22 +34,170 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
public class SimpleScheduler {
private static final Logger LOG =
LogManager.getLogger(SimpleScheduler.class);
+ public static class BlackListInfo {
+ public BlackListInfo() {}
+
+ private Lock lock = new ReentrantLock();
+ // Record the reason why this backend is added to black list, will be
updated only once.
+ private String reasonForBlackList = "";
+ // Record the first time this backend is tried to be added to black
list.
+ private Long firstRecordBlackTimestampMs = 0L;
+ // Record the last time this backend is tried to be added to black
list.
+ private Long lastRecordBlackTimestampMs = 0L;
+ // Record the timestamp this backend is really regarded as blacked.
+ private Long lastBlackTimestampMs = 0L;
+ // Record the count of this backend is tried to be added to black list.
+ private Long recordBlackListCount = 0L;
+ // Record the backend id
+ private Long backendID = 0L;
+
+ // Try to add this backend to black list, backend is not really added
to black list until
+ // condition in shouldBeBlackListed is met.
+ public void tryAddBlackList(String reason) {
+ lock.lock();
+ try {
+ recordAddBlackList(reason);
+ if (shouldBeBlackListed()) {
+ doAddBlackList();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // Just update the fileds.
+ private void recordAddBlackList(String reason) {
+ if (firstRecordBlackTimestampMs <= 0) {
+ firstRecordBlackTimestampMs = System.currentTimeMillis();
+ }
+
+ lastRecordBlackTimestampMs = System.currentTimeMillis();
+
+ // Restart the counter if the time interval is too long
+ if (lastRecordBlackTimestampMs - firstRecordBlackTimestampMs
+ >= Config.do_add_backend_black_list_threshold_seconds *
1000) {
+ firstRecordBlackTimestampMs = lastRecordBlackTimestampMs;
+ recordBlackListCount = 0L;
+ }
+
+ recordBlackListCount++;
+
+ if (Strings.isNullOrEmpty(reasonForBlackList) &&
!Strings.isNullOrEmpty(reason)) {
+ reasonForBlackList = reason;
+ }
+ }
+
+ private boolean shouldBeBlackListed() {
+ if (lastRecordBlackTimestampMs <= 0 || firstRecordBlackTimestampMs
<= 0) {
+ return false;
+ }
+
+ if (recordBlackListCount <
Config.do_add_backend_black_list_threshold_count) {
+ return false;
+ }
+
+ if (lastRecordBlackTimestampMs - firstRecordBlackTimestampMs
+ >= Config.do_add_backend_black_list_threshold_seconds *
1000) {
+ return false;
+ }
+
+ return true;
+ }
+
+ private void doAddBlackList() {
+ lastBlackTimestampMs = System.currentTimeMillis();
+ Exception e = new Exception();
+ String stack =
org.apache.commons.lang3.exception.ExceptionUtils.getStackTrace(e);
+ LOG.warn("Backend is added to black
list.\nInformation:\n{}\nStack:\n{}", toString(), stack);
+ }
+
+ public boolean shouldBeRemoved() {
+ lock.lock();
+ try {
+ if (lastBlackTimestampMs <= 0) {
+ return false;
+ }
+
+ Long currentTimeStamp = System.currentTimeMillis();
+ // If this backend has not been recorded as black for more
than 10 secs, then regard it as normal
+ if (currentTimeStamp - lastBlackTimestampMs
+ >= Config.stay_in_backend_black_list_threshold_seconds
* 1000) {
+ return true;
+ } else {
+ return false;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean isBlacked() {
+ lock.lock();
+ try {
+ return lastBlackTimestampMs > 0;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public String getReasonForBlackList() {
+ lock.lock();
+ try {
+ return reasonForBlackList;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ lock.lock();
+ // Convert to human readable time
+ LocalDateTime firstRecordBlackTimes = LocalDateTime.ofInstant(
+ Instant.ofEpochMilli(firstRecordBlackTimestampMs),
ZoneId.systemDefault());
+ LocalDateTime lastRecordBlackTimes = LocalDateTime.ofInstant(
+ Instant.ofEpochMilli(lastRecordBlackTimestampMs),
ZoneId.systemDefault());
+ LocalDateTime lastBlackTimes = LocalDateTime.ofInstant(
+ Instant.ofEpochMilli(lastBlackTimestampMs),
ZoneId.systemDefault());
+ try {
+ sb.append("\nbackendID: ").append(backendID).append("\n");
+ sb.append("reasonForBlackList:
").append(reasonForBlackList).append("\n");
+ sb.append("firstRecordBlackTimestampMs:
").append(firstRecordBlackTimes).append("\n");
+ sb.append("lastRecordBlackTimestampMs:
").append(lastRecordBlackTimes).append("\n");
+ sb.append("lastBlackTimestampMs:
").append(lastBlackTimes).append("\n");
+ sb.append("recordBlackListCount:
").append(recordBlackListCount).append("\n");
+ sb.append("Config.do_add_backend_black_list_threshold_seconds:
")
+
.append(Config.do_add_backend_black_list_threshold_seconds).append("\n");
+
sb.append("Config.stay_in_backend_black_list_threshold_seconds: ")
+
.append(Config.stay_in_backend_black_list_threshold_seconds).append("\n");
+ return sb.toString();
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
private static AtomicLong nextId = new AtomicLong(0);
// backend id -> (try time, reason)
// There will be multi threads to read and modify this map.
// But only one thread (UpdateBlacklistThread) will modify the `Pair`.
// So using concurrent map is enough
- private static Map<Long, Pair<Integer, String>> blacklistBackends =
Maps.newConcurrentMap();
+ private static Map<Long, BlackListInfo> blacklistBackends =
Maps.newConcurrentMap();
private static UpdateBlacklistThread updateBlacklistThread;
public static void init() {
@@ -68,6 +216,7 @@ public class SimpleScheduler {
if (LOG.isDebugEnabled()) {
LOG.debug("getHost backendID={}, backendSize={}", backendId,
backends.size());
}
+
Backend backend = backends.get(backendId);
if (isAvailable(backend)) {
@@ -154,13 +303,13 @@ public class SimpleScheduler {
for (int i = 0; i < backendIds.size() && i < limit; i++) {
long beId = backendIds.get(i);
Backend be = backends.get(beId);
+ BlackListInfo blackListInfo = blacklistBackends.get(beId);
if (be == null) {
res.add(beId + ": not exist");
} else if (!be.isAlive()) {
res.add(beId + ": not alive");
- } else if (blacklistBackends.containsKey(beId)) {
- Pair<Integer, String> pair = blacklistBackends.get(beId);
- res.add(beId + ": in black list(" + (pair == null ? "unknown"
: pair.second) + ")");
+ } else if (blackListInfo != null && blackListInfo.isBlacked()) {
+ res.add(beId + ": in black list(" +
blackListInfo.getReasonForBlackList() + ")");
} else if (!be.isQueryAvailable()) {
res.add(beId + ": disable query");
} else {
@@ -177,12 +326,26 @@ public class SimpleScheduler {
return;
}
- blacklistBackends.put(backendID,
Pair.of(Config.blacklist_duration_second + 1, reason));
- LOG.warn("add backend {} to black list. reason: {}", backendID,
reason);
+ BlackListInfo blackListInfo = blacklistBackends.putIfAbsent(backendID,
new BlackListInfo());
+ if (blackListInfo == null) {
+ blackListInfo = blacklistBackends.get(backendID);
+ }
+
+ blackListInfo.tryAddBlackList(reason);
}
public static boolean isAvailable(Backend backend) {
- return (backend != null && backend.isQueryAvailable() &&
!blacklistBackends.containsKey(backend.getId()));
+ if (backend == null) {
+ return false;
+ }
+ if (!backend.isQueryAvailable()) {
+ return false;
+ }
+ BlackListInfo blackListInfo = blacklistBackends.get(backend.getId());
+ if (blackListInfo != null && blackListInfo.isBlacked()) {
+ return false;
+ }
+ return true;
}
private static class UpdateBlacklistThread implements Runnable {
@@ -208,25 +371,26 @@ public class SimpleScheduler {
Thread.sleep(1000L);
SystemInfoService clusterInfoService =
Env.getCurrentSystemInfo();
- Iterator<Map.Entry<Long, Pair<Integer, String>>> iterator
= blacklistBackends.entrySet().iterator();
+ Iterator<Map.Entry<Long, SimpleScheduler.BlackListInfo>>
+ iterator = blacklistBackends.entrySet().iterator();
+
while (iterator.hasNext()) {
- Map.Entry<Long, Pair<Integer, String>> entry =
iterator.next();
+ Map.Entry<Long, SimpleScheduler.BlackListInfo> entry =
iterator.next();
Long backendId = entry.getKey();
-
+ Backend backend =
clusterInfoService.getBackend(backendId);
// remove from blacklist if backend does not exist
anymore
- if (clusterInfoService.getBackend(backendId) == null) {
+ if (backend == null) {
iterator.remove();
LOG.info("remove backend {} from black list
because it does not exist", backendId);
} else {
- // 3. max try time is reach
- entry.getValue().first = entry.getValue().first -
1;
- if (entry.getValue().first <= 0) {
+ BlackListInfo blackListInfo = entry.getValue();
+ if (backend.isAlive() ||
blackListInfo.shouldBeRemoved()) {
iterator.remove();
- LOG.warn("remove backend {} from black list.
reach max try time", backendId);
+ LOG.info("remove backend {} from black list.
backend is alive: {}",
+ backendId, backend.isAlive());
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("blacklistBackends backendID={}
retryTimes={}",
- backendId, entry.getValue().first);
+ LOG.debug("blacklistBackends {}",
blackListInfo.toString());
}
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java
index ac13900d2cf..86ff4864fd3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java
@@ -17,10 +17,12 @@
package org.apache.doris.qe;
+import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Reference;
import org.apache.doris.common.UserException;
import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TScanRangeLocation;
@@ -108,8 +110,8 @@ public class SimpleSchedulerTest {
@Override
public void run() {
try {
- Set<String> resBackends = Sets.newHashSet();
long start = System.currentTimeMillis();
+ Set<String> resBackends = Sets.newHashSet();
for (int i = 0; i < 1000; i++) {
TNetworkAddress address =
SimpleScheduler.getHost(backends, ref);
Assert.assertNotNull(address);
@@ -126,20 +128,29 @@ public class SimpleSchedulerTest {
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
- SimpleScheduler.addToBlacklist(be1.getId(), "test");
+ for (int i = 0; i < 100; i++) {
+ SimpleScheduler.addToBlacklist(be1.getId(), "test");
+ }
}
});
+ SystemInfoService clusterInfoService = Env.getCurrentSystemInfo();
+ be1.setAlive(false);
+ clusterInfoService.addBackend(be1);
t3.start();
+ t3.join();
+
+ // When t1 and t2 starts, be1 has already been in blacklist.
t1.start();
t2.start();
t1.join();
t2.join();
- t3.join();
Assert.assertFalse(SimpleScheduler.isAvailable(be1));
- Thread.sleep((Config.heartbeat_interval_second + 5) * 1000);
+ be1.setAlive(true);
+ // Sleep 5s so that UpdateBlacklistThread will remove be1 from
blacklist
+ Thread.sleep(1000 * 5L);
Assert.assertTrue(SimpleScheduler.isAvailable(be1));
}
@@ -182,11 +193,14 @@ public class SimpleSchedulerTest {
scanRangeLocation5.setBackendId(be5.getId());
locations.add(scanRangeLocation5);
- SimpleScheduler.addToBlacklist(be1.getId(), "test");
- SimpleScheduler.addToBlacklist(be2.getId(), "test");
- SimpleScheduler.addToBlacklist(be3.getId(), "test");
- SimpleScheduler.addToBlacklist(be4.getId(), "test");
- SimpleScheduler.addToBlacklist(be5.getId(), "test");
+ for (int i = 0; i <= Config.do_add_backend_black_list_threshold_count;
i++) {
+ SimpleScheduler.addToBlacklist(be1.getId(), "test");
+ SimpleScheduler.addToBlacklist(be2.getId(), "test");
+ SimpleScheduler.addToBlacklist(be3.getId(), "test");
+ SimpleScheduler.addToBlacklist(be4.getId(), "test");
+ SimpleScheduler.addToBlacklist(be5.getId(), "test");
+ }
+
try {
SimpleScheduler.getHost(locations.get(0).backend_id, locations,
backends, ref);
Assert.fail();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]