This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9103d77b33d [opt](blacklist) Backend should not be added to blacklist 
easily #41170 (#41957)
9103d77b33d is described below

commit 9103d77b33dc30eab26a2aaf9fe9f61027663f81
Author: zhiqiang <[email protected]>
AuthorDate: Thu Oct 17 11:34:53 2024 +0800

    [opt](blacklist) Backend should not be added to blacklist easily #41170 
(#41957)
    
    cherry pick from #41170
---
 .../main/java/org/apache/doris/common/Config.java  |  14 ++
 .../java/org/apache/doris/qe/SimpleScheduler.java  | 200 +++++++++++++++++++--
 .../org/apache/doris/qe/SimpleSchedulerTest.java   |  32 +++-
 3 files changed, 219 insertions(+), 27 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 1ddfc216068..a33f59b4c40 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1963,6 +1963,20 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = false)
     public static boolean disable_backend_black_list = false;
 
+    @ConfField(mutable = true, masterOnly = false, description = {
+        "If a backend is tried to be added to black list 
do_add_backend_black_list_threshold_count times "
+            + "in do_add_backend_black_list_threshold_seconds, it will be 
added to black list."})
+    public static long do_add_backend_black_list_threshold_count = 10;
+
+    @ConfField(mutable = true, masterOnly = false, description = {
+        "If a backend is tried to be added to black list 
do_add_backend_black_list_threshold_count times "
+            + "in do_add_backend_black_list_threshold_seconds, it will be 
added to black list."})
+    public static long do_add_backend_black_list_threshold_seconds = 30;
+
+    @ConfField(mutable = true, masterOnly = false, description = {
+        "Backend will stay in black list for this time after it is added to 
black list."})
+    public static long stay_in_backend_black_list_threshold_seconds = 60;
+
     /**
      * Maximum backend heartbeat failure tolerance count.
      * Default is 1, which means if 1 heart failed, the backend will be marked 
as dead.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
index f8d30f08b9b..131815cfe20 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
@@ -19,7 +19,6 @@ package org.apache.doris.qe;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.Pair;
 import org.apache.doris.common.Reference;
 import org.apache.doris.common.UserException;
 import org.apache.doris.system.Backend;
@@ -27,6 +26,7 @@ import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TScanRangeLocation;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -34,22 +34,170 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 public class SimpleScheduler {
     private static final Logger LOG = 
LogManager.getLogger(SimpleScheduler.class);
 
+    public static class BlackListInfo {
+        public BlackListInfo() {}
+
+        private Lock lock = new ReentrantLock();
+        // Record the reason why this backend is added to black list, will be 
updated only once.
+        private String reasonForBlackList = "";
+        // Record the first time this backend is tried to be added to black 
list.
+        private Long firstRecordBlackTimestampMs = 0L;
+        // Record the last time this backend is tried to be added to black 
list.
+        private Long lastRecordBlackTimestampMs = 0L;
+        // Record the timestamp this backend is really regarded as blacked.
+        private Long lastBlackTimestampMs = 0L;
+        // Record the count of this backend is tried to be added to black list.
+        private Long recordBlackListCount = 0L;
+        // Record the backend id
+        private Long backendID = 0L;
+
+        // Try to add this backend to black list, backend is not really added 
to black list until
+        // condition in shouldBeBlackListed is met.
+        public void tryAddBlackList(String reason) {
+            lock.lock();
+            try {
+                recordAddBlackList(reason);
+                if (shouldBeBlackListed()) {
+                    doAddBlackList();
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        // Just update the fileds.
+        private void recordAddBlackList(String reason) {
+            if (firstRecordBlackTimestampMs <= 0) {
+                firstRecordBlackTimestampMs = System.currentTimeMillis();
+            }
+
+            lastRecordBlackTimestampMs = System.currentTimeMillis();
+
+            // Restart the counter if the time interval is too long
+            if (lastRecordBlackTimestampMs - firstRecordBlackTimestampMs
+                    >= Config.do_add_backend_black_list_threshold_seconds * 
1000) {
+                firstRecordBlackTimestampMs = lastRecordBlackTimestampMs;
+                recordBlackListCount = 0L;
+            }
+
+            recordBlackListCount++;
+
+            if (Strings.isNullOrEmpty(reasonForBlackList) && 
!Strings.isNullOrEmpty(reason)) {
+                reasonForBlackList = reason;
+            }
+        }
+
+        private boolean shouldBeBlackListed() {
+            if (lastRecordBlackTimestampMs <= 0 || firstRecordBlackTimestampMs 
<= 0) {
+                return false;
+            }
+
+            if (recordBlackListCount < 
Config.do_add_backend_black_list_threshold_count) {
+                return false;
+            }
+
+            if (lastRecordBlackTimestampMs - firstRecordBlackTimestampMs
+                    >= Config.do_add_backend_black_list_threshold_seconds * 
1000) {
+                return false;
+            }
+
+            return true;
+        }
+
+        private void doAddBlackList() {
+            lastBlackTimestampMs = System.currentTimeMillis();
+            Exception e = new Exception();
+            String stack = 
org.apache.commons.lang3.exception.ExceptionUtils.getStackTrace(e);
+            LOG.warn("Backend is added to black 
list.\nInformation:\n{}\nStack:\n{}", toString(), stack);
+        }
+
+        public boolean shouldBeRemoved() {
+            lock.lock();
+            try {
+                if (lastBlackTimestampMs <= 0) {
+                    return false;
+                }
+
+                Long currentTimeStamp = System.currentTimeMillis();
+                // If this backend has not been recorded as black for more 
than 10 secs, then regard it as normal
+                if (currentTimeStamp - lastBlackTimestampMs
+                        >= Config.stay_in_backend_black_list_threshold_seconds 
* 1000) {
+                    return true;
+                } else {
+                    return false;
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        public boolean isBlacked() {
+            lock.lock();
+            try {
+                return lastBlackTimestampMs > 0;
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        public String getReasonForBlackList() {
+            lock.lock();
+            try {
+                return reasonForBlackList;
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            lock.lock();
+            // Convert to human readable time
+            LocalDateTime firstRecordBlackTimes = LocalDateTime.ofInstant(
+                    Instant.ofEpochMilli(firstRecordBlackTimestampMs), 
ZoneId.systemDefault());
+            LocalDateTime lastRecordBlackTimes = LocalDateTime.ofInstant(
+                    Instant.ofEpochMilli(lastRecordBlackTimestampMs), 
ZoneId.systemDefault());
+            LocalDateTime lastBlackTimes = LocalDateTime.ofInstant(
+                    Instant.ofEpochMilli(lastBlackTimestampMs), 
ZoneId.systemDefault());
+            try {
+                sb.append("\nbackendID: ").append(backendID).append("\n");
+                sb.append("reasonForBlackList: 
").append(reasonForBlackList).append("\n");
+                sb.append("firstRecordBlackTimestampMs: 
").append(firstRecordBlackTimes).append("\n");
+                sb.append("lastRecordBlackTimestampMs: 
").append(lastRecordBlackTimes).append("\n");
+                sb.append("lastBlackTimestampMs: 
").append(lastBlackTimes).append("\n");
+                sb.append("recordBlackListCount: 
").append(recordBlackListCount).append("\n");
+                sb.append("Config.do_add_backend_black_list_threshold_seconds: 
")
+                    
.append(Config.do_add_backend_black_list_threshold_seconds).append("\n");
+                
sb.append("Config.stay_in_backend_black_list_threshold_seconds: ")
+                    
.append(Config.stay_in_backend_black_list_threshold_seconds).append("\n");
+                return sb.toString();
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
     private static AtomicLong nextId = new AtomicLong(0);
 
     // backend id -> (try time, reason)
     // There will be multi threads to read and modify this map.
     // But only one thread (UpdateBlacklistThread) will modify the `Pair`.
     // So using concurrent map is enough
-    private static Map<Long, Pair<Integer, String>> blacklistBackends = 
Maps.newConcurrentMap();
+    private static Map<Long, BlackListInfo> blacklistBackends = 
Maps.newConcurrentMap();
     private static UpdateBlacklistThread updateBlacklistThread;
 
     public static void init() {
@@ -68,6 +216,7 @@ public class SimpleScheduler {
         if (LOG.isDebugEnabled()) {
             LOG.debug("getHost backendID={}, backendSize={}", backendId, 
backends.size());
         }
+
         Backend backend = backends.get(backendId);
 
         if (isAvailable(backend)) {
@@ -154,13 +303,13 @@ public class SimpleScheduler {
         for (int i = 0; i < backendIds.size() && i < limit; i++) {
             long beId = backendIds.get(i);
             Backend be = backends.get(beId);
+            BlackListInfo blackListInfo = blacklistBackends.get(beId);
             if (be == null) {
                 res.add(beId + ": not exist");
             } else if (!be.isAlive()) {
                 res.add(beId + ": not alive");
-            } else if (blacklistBackends.containsKey(beId)) {
-                Pair<Integer, String> pair = blacklistBackends.get(beId);
-                res.add(beId + ": in black list(" + (pair == null ? "unknown" 
: pair.second) + ")");
+            } else if (blackListInfo != null && blackListInfo.isBlacked()) {
+                res.add(beId + ": in black list(" + 
blackListInfo.getReasonForBlackList() + ")");
             } else if (!be.isQueryAvailable()) {
                 res.add(beId + ": disable query");
             } else {
@@ -177,12 +326,26 @@ public class SimpleScheduler {
             return;
         }
 
-        blacklistBackends.put(backendID, 
Pair.of(Config.blacklist_duration_second + 1, reason));
-        LOG.warn("add backend {} to black list. reason: {}", backendID, 
reason);
+        BlackListInfo blackListInfo = blacklistBackends.putIfAbsent(backendID, 
new BlackListInfo());
+        if (blackListInfo == null) {
+            blackListInfo = blacklistBackends.get(backendID);
+        }
+
+        blackListInfo.tryAddBlackList(reason);
     }
 
     public static boolean isAvailable(Backend backend) {
-        return (backend != null && backend.isQueryAvailable() && 
!blacklistBackends.containsKey(backend.getId()));
+        if (backend == null) {
+            return false;
+        }
+        if (!backend.isQueryAvailable()) {
+            return false;
+        }
+        BlackListInfo blackListInfo = blacklistBackends.get(backend.getId());
+        if (blackListInfo != null && blackListInfo.isBlacked()) {
+            return false;
+        }
+        return true;
     }
 
     private static class UpdateBlacklistThread implements Runnable {
@@ -208,25 +371,26 @@ public class SimpleScheduler {
                     Thread.sleep(1000L);
                     SystemInfoService clusterInfoService = 
Env.getCurrentSystemInfo();
 
-                    Iterator<Map.Entry<Long, Pair<Integer, String>>> iterator 
= blacklistBackends.entrySet().iterator();
+                    Iterator<Map.Entry<Long, SimpleScheduler.BlackListInfo>>
+                            iterator = blacklistBackends.entrySet().iterator();
+
                     while (iterator.hasNext()) {
-                        Map.Entry<Long, Pair<Integer, String>> entry = 
iterator.next();
+                        Map.Entry<Long, SimpleScheduler.BlackListInfo> entry = 
iterator.next();
                         Long backendId = entry.getKey();
-
+                        Backend backend = 
clusterInfoService.getBackend(backendId);
                         // remove from blacklist if backend does not exist 
anymore
-                        if (clusterInfoService.getBackend(backendId) == null) {
+                        if (backend == null) {
                             iterator.remove();
                             LOG.info("remove backend {} from black list 
because it does not exist", backendId);
                         } else {
-                            // 3. max try time is reach
-                            entry.getValue().first = entry.getValue().first - 
1;
-                            if (entry.getValue().first <= 0) {
+                            BlackListInfo blackListInfo = entry.getValue();
+                            if (backend.isAlive() || 
blackListInfo.shouldBeRemoved()) {
                                 iterator.remove();
-                                LOG.warn("remove backend {} from black list. 
reach max try time", backendId);
+                                LOG.info("remove backend {} from black list. 
backend is alive: {}",
+                                        backendId, backend.isAlive());
                             } else {
                                 if (LOG.isDebugEnabled()) {
-                                    LOG.debug("blacklistBackends backendID={} 
retryTimes={}",
-                                            backendId, entry.getValue().first);
+                                    LOG.debug("blacklistBackends {}", 
blackListInfo.toString());
                                 }
                             }
                         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java
index ac13900d2cf..86ff4864fd3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java
@@ -17,10 +17,12 @@
 
 package org.apache.doris.qe;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Reference;
 import org.apache.doris.common.UserException;
 import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TScanRangeLocation;
 
@@ -108,8 +110,8 @@ public class SimpleSchedulerTest {
             @Override
             public void run() {
                 try {
-                    Set<String> resBackends = Sets.newHashSet();
                     long start = System.currentTimeMillis();
+                    Set<String> resBackends = Sets.newHashSet();
                     for (int i = 0; i < 1000; i++) {
                         TNetworkAddress address = 
SimpleScheduler.getHost(backends, ref);
                         Assert.assertNotNull(address);
@@ -126,20 +128,29 @@ public class SimpleSchedulerTest {
         Thread t3 = new Thread(new Runnable() {
             @Override
             public void run() {
-                SimpleScheduler.addToBlacklist(be1.getId(), "test");
+                for (int i = 0; i < 100; i++) {
+                    SimpleScheduler.addToBlacklist(be1.getId(), "test");
+                }
             }
         });
 
+        SystemInfoService clusterInfoService = Env.getCurrentSystemInfo();
+        be1.setAlive(false);
+        clusterInfoService.addBackend(be1);
         t3.start();
+        t3.join();
+
+        // When t1 and t2 starts, be1 has already been in blacklist.
         t1.start();
         t2.start();
 
         t1.join();
         t2.join();
-        t3.join();
 
         Assert.assertFalse(SimpleScheduler.isAvailable(be1));
-        Thread.sleep((Config.heartbeat_interval_second + 5) * 1000);
+        be1.setAlive(true);
+        // Sleep 5s so that UpdateBlacklistThread will remove be1 from 
blacklist
+        Thread.sleep(1000 * 5L);
         Assert.assertTrue(SimpleScheduler.isAvailable(be1));
     }
 
@@ -182,11 +193,14 @@ public class SimpleSchedulerTest {
         scanRangeLocation5.setBackendId(be5.getId());
         locations.add(scanRangeLocation5);
 
-        SimpleScheduler.addToBlacklist(be1.getId(), "test");
-        SimpleScheduler.addToBlacklist(be2.getId(), "test");
-        SimpleScheduler.addToBlacklist(be3.getId(), "test");
-        SimpleScheduler.addToBlacklist(be4.getId(), "test");
-        SimpleScheduler.addToBlacklist(be5.getId(), "test");
+        for (int i = 0; i <= Config.do_add_backend_black_list_threshold_count; 
i++) {
+            SimpleScheduler.addToBlacklist(be1.getId(), "test");
+            SimpleScheduler.addToBlacklist(be2.getId(), "test");
+            SimpleScheduler.addToBlacklist(be3.getId(), "test");
+            SimpleScheduler.addToBlacklist(be4.getId(), "test");
+            SimpleScheduler.addToBlacklist(be5.getId(), "test");
+        }
+
         try {
             SimpleScheduler.getHost(locations.get(0).backend_id, locations, 
backends, ref);
             Assert.fail();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to