This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2679fa4ea7f [improvement](tablet clone) furthur repair replicas should 
be check even if they are versions catchup (#25551)
2679fa4ea7f is described below

commit 2679fa4ea7f4555c8ea18018983eede9080f6470
Author: yujun <[email protected]>
AuthorDate: Thu Oct 26 18:14:40 2023 +0800

    [improvement](tablet clone) furthur repair replicas should be check even if 
they are versions catchup (#25551)
---
 be/src/util/debug_points.h                         | 39 ++++++++++--
 be/test/util/debug_points_test.cpp                 | 29 ++++++++-
 .../apache/doris/clone/LoadStatisticForTag.java    | 55 +++++++++++++----
 .../org/apache/doris/clone/TabletSchedCtx.java     | 70 +++++++++++++++-------
 .../org/apache/doris/clone/TabletScheduler.java    | 17 ++++++
 .../apache/doris/common/util/DebugPointUtil.java   | 26 ++++++++
 .../doris/common/util/DebugPointUtilTest.java      |  6 ++
 7 files changed, 201 insertions(+), 41 deletions(-)

diff --git a/be/src/util/debug_points.h b/be/src/util/debug_points.h
index 47b3aaa9cbd..1106a548f8d 100644
--- a/be/src/util/debug_points.h
+++ b/be/src/util/debug_points.h
@@ -26,7 +26,9 @@
 
 #include "common/compiler_util.h"
 #include "common/config.h"
+#include "fmt/format.h"
 
+// more usage can see 'util/debug_points_test.cpp'
 #define DBUG_EXECUTE_IF(debug_point_name, code)                               \
     if (UNLIKELY(config::enable_debug_points)) {                              \
         auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
@@ -44,7 +46,7 @@ struct DebugPoint {
 
     std::map<std::string, std::string> params;
 
-    template <typename T = int>
+    template <typename T>
     T param(const std::string& key, T default_value = T()) {
         auto it = params.find(key);
         if (it == params.end()) {
@@ -60,25 +62,50 @@ struct DebugPoint {
             return boost::lexical_cast<T>(it->second);
         } else if constexpr (std::is_arithmetic_v<T>) {
             return boost::lexical_cast<T>(it->second);
+        } else if constexpr (std::is_same_v<T, const char*>) {
+            return it->second.c_str();
         } else {
             static_assert(std::is_same_v<T, std::string>);
             return it->second;
         }
     }
-
-    std::string param(const std::string& key, const char* default_value) {
-        return param<std::string>(key, std::string(default_value));
-    }
 };
 
 class DebugPoints {
 public:
     bool is_enable(const std::string& name);
     std::shared_ptr<DebugPoint> get_debug_point(const std::string& name);
-    void add(const std::string& name, std::shared_ptr<DebugPoint> debug_point);
     void remove(const std::string& name);
     void clear();
 
+    // if not enable debug point or its params not contains `key`, then return 
`default_value`
+    // url: /api/debug_point/add/name?k1=v1&k2=v2&...
+    template <typename T>
+    T get_debug_param_or_default(const std::string& name, const std::string& 
key,
+                                 const T& default_value) {
+        auto debug_point = get_debug_point(name);
+        return debug_point ? debug_point->param(key, default_value) : 
default_value;
+    }
+
+    // url: /api/debug_point/add/name?value=v
+    template <typename T>
+    T get_debug_param_or_default(const std::string& name, const T& 
default_value) {
+        return get_debug_param_or_default(name, "value", default_value);
+    }
+
+    void add(const std::string& name, std::shared_ptr<DebugPoint> debug_point);
+
+    // more 'add' functions for convenient use
+    void add(const std::string& name) { add(name, 
std::make_shared<DebugPoint>()); }
+    void add_with_params(const std::string& name,
+                         const std::map<std::string, std::string>& params) {
+        add(name, std::shared_ptr<DebugPoint>(new DebugPoint {.params = 
params}));
+    }
+    template <typename T>
+    void add_with_value(const std::string& name, const T& value) {
+        add_with_params(name, {{"value", fmt::format("{}", value)}});
+    }
+
     static DebugPoints* instance();
 
 private:
diff --git a/be/test/util/debug_points_test.cpp 
b/be/test/util/debug_points_test.cpp
index df86cf0d1b9..76c4fd00781 100644
--- a/be/test/util/debug_points_test.cpp
+++ b/be/test/util/debug_points_test.cpp
@@ -60,7 +60,7 @@ TEST(DebugPointsTest, BaseTest) {
     DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1, dp->param<int>("v1", 100)));
     DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param<std::string>("v2")));
     DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param("v2", std::string())));
-    DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param("v2", "b")));
+    DBUG_EXECUTE_IF("dbug5", EXPECT_STREQ("a", dp->param("v2", "b")));
     DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1.2, dp->param<double>("v3")));
     DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1.2, dp->param("v3", 0.0)));
     DBUG_EXECUTE_IF("dbug5", EXPECT_TRUE(dp->param("v4", false)));
@@ -68,7 +68,32 @@ TEST(DebugPointsTest, BaseTest) {
     DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(0L, dp->param<int64_t>("v_not_exist")));
     DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(0L, dp->param("v_not_exist", 0L)));
     DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(123, dp->param("v_not_exist", 123)));
-    DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("abcd", dp->param("v_not_exist", 
"abcd")));
+    DBUG_EXECUTE_IF("dbug5", EXPECT_STREQ("abcd", dp->param("v_not_exist", 
"abcd")));
+
+    EXPECT_EQ(1.2, 
DebugPoints::instance()->get_debug_param_or_default("dbug5", "v3", 0.0));
+    EXPECT_EQ(100,
+              
DebugPoints::instance()->get_debug_param_or_default("point_not_exists", "k", 
100));
+
+    POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/dbug6?value=567");
+    EXPECT_EQ(567, 
DebugPoints::instance()->get_debug_param_or_default("dbug6", 0));
+}
+
+TEST(DebugPointsTest, AddTest) {
+    config::enable_debug_points = true;
+    DebugPoints::instance()->clear();
+
+    DebugPoints::instance()->add("dbug1");
+    EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug1"));
+
+    DebugPoints::instance()->add_with_params("dbug2", {{"k1", "100"}});
+    EXPECT_EQ(100, 
DebugPoints::instance()->get_debug_param_or_default("dbug2", "k1", 0));
+
+    DebugPoints::instance()->add_with_value("dbug3", 567);
+    EXPECT_EQ(567, 
DebugPoints::instance()->get_debug_param_or_default("dbug3", 567));
+
+    DebugPoints::instance()->add_with_value("dbug4", "hello");
+    EXPECT_EQ("hello",
+              
DebugPoints::instance()->get_debug_param_or_default<std::string>("dbug4", ""));
 }
 
 } // namespace doris
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java
index 413a3b129f1..158f2cde4af 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.clone.BackendLoadStatistic.Classification;
 import org.apache.doris.clone.BackendLoadStatistic.LoadScore;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
@@ -186,35 +187,58 @@ public class LoadStatisticForTag {
         int lowCounter = 0;
         int midCounter = 0;
         int highCounter = 0;
+
+        long debugHighBeId = 
DebugPointUtil.getDebugParamOrDefault("FE.HIGH_LOAD_BE_ID", -1L);
+        if (debugHighBeId > 0) {
+            final long targetBeId = debugHighBeId; // debugHighBeId can not 
put in lambda cause it's updated later
+            if (!beLoadStatistics.stream().anyMatch(it -> it.getBeId() == 
targetBeId)) {
+                debugHighBeId = -1L;
+            }
+        }
+
         for (BackendLoadStatistic beStat : beLoadStatistics) {
             if (!beStat.hasMedium(medium)) {
                 continue;
             }
 
-
-            if (Config.be_rebalancer_fuzzy_test) {
+            Classification clazz = Classification.MID;
+            if (debugHighBeId > 0) {
+                if (beStat.getBeId() == debugHighBeId) {
+                    clazz = Classification.HIGH;
+                } else {
+                    clazz = Classification.LOW;
+                }
+            } else if (Config.be_rebalancer_fuzzy_test) {
                 if (beStat.getLoadScore(medium) > avgLoadScore) {
-                    beStat.setClazz(medium, Classification.HIGH);
-                    highCounter++;
+                    clazz = Classification.HIGH;
                 } else if (beStat.getLoadScore(medium) < avgLoadScore) {
-                    beStat.setClazz(medium, Classification.LOW);
-                    lowCounter++;
+                    clazz = Classification.LOW;
                 }
             } else {
                 if (Math.abs(beStat.getLoadScore(medium) - avgLoadScore) / 
avgLoadScore
                         > Config.balance_load_score_threshold) {
                     if (beStat.getLoadScore(medium) > avgLoadScore) {
-                        beStat.setClazz(medium, Classification.HIGH);
-                        highCounter++;
+                        clazz = Classification.HIGH;
                     } else if (beStat.getLoadScore(medium) < avgLoadScore) {
-                        beStat.setClazz(medium, Classification.LOW);
-                        lowCounter++;
+                        clazz = Classification.LOW;
                     }
-                } else {
-                    beStat.setClazz(medium, Classification.MID);
-                    midCounter++;
                 }
             }
+
+            beStat.setClazz(medium, clazz);
+            switch (clazz) {
+                case HIGH:
+                    highCounter++;
+                    break;
+                case LOW:
+                    lowCounter++;
+                    break;
+                case MID:
+                    midCounter++;
+                    break;
+                default:
+                    break;
+            }
         }
 
         LOG.debug("classify backend by load. medium: {} avg load score: {}. 
low/mid/high: {}/{}/{}",
@@ -265,6 +289,11 @@ public class LoadStatisticForTag {
             return false;
         }
 
+        long debugHighBeId = 
DebugPointUtil.getDebugParamOrDefault("FE.HIGH_LOAD_BE_ID", -1L);
+        if (srcBeStat.getBeId() == debugHighBeId) {
+            return true;
+        }
+
         currentSrcBeScore = srcBeStat.getLoadScore(medium);
         currentDestBeScore = destBeStat.getLoadScore(medium);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index c57bdc7762a..11f338f1224 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -666,6 +666,7 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         List<Replica> decommissionCand = Lists.newArrayList();
         List<Replica> colocateCand = Lists.newArrayList();
         List<Replica> notColocateCand = Lists.newArrayList();
+        List<Replica> furtherRepairs = Lists.newArrayList();
         for (Replica replica : tablet.getReplicas()) {
             if (replica.isBad()) {
                 LOG.debug("replica {} is bad, skip. tablet: {}",
@@ -688,6 +689,11 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
 
             if (replica.getLastFailedVersion() <= 0
                     && replica.getVersion() >= visibleVersion) {
+
+                if (tabletStatus == TabletStatus.NEED_FURTHER_REPAIR && 
replica.needFurtherRepair()) {
+                    furtherRepairs.add(replica);
+                }
+
                 // skip healthy replica
                 LOG.debug("replica {} version {} is healthy, visible version 
{}, replica state {}, skip. tablet: {}",
                         replica.getId(), replica.getVersion(), visibleVersion, 
replica.getState(), tabletId);
@@ -709,8 +715,24 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         } else {
             candidates = decommissionCand;
         }
+
         if (candidates.isEmpty()) {
-            throw new SchedException(Status.UNRECOVERABLE, "unable to choose 
dest replica");
+            if (furtherRepairs.isEmpty()) {
+                throw new SchedException(Status.UNRECOVERABLE, "unable to 
choose dest replica");
+            }
+
+            boolean allCatchup = true;
+            for (Replica replica : furtherRepairs) {
+                if (checkFurthurRepairFinish(replica, visibleVersion)) {
+                    replica.setNeedFurtherRepair(false);
+                    replica.setFurtherRepairWatermarkTxnTd(-1);
+                } else {
+                    allCatchup = false;
+                }
+            }
+
+            throw new SchedException(Status.FINISHED,
+                    allCatchup ? "further repair all catchup" : "further 
repair waiting catchup");
         }
 
         Replica chosenReplica = null;
@@ -782,6 +804,32 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         setDest(chosenReplica.getBackendId(), chosenReplica.getPathHash());
     }
 
+    private boolean checkFurthurRepairFinish(Replica replica, long version) {
+        if (replica.getVersion() < version || replica.getLastFailedVersion() > 
0) {
+            return false;
+        }
+
+        long furtherRepairWatermarkTxnTd = 
replica.getFurtherRepairWatermarkTxnTd();
+        if (furtherRepairWatermarkTxnTd < 0) {
+            return true;
+        }
+
+        try {
+            if 
(Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
+                        furtherRepairWatermarkTxnTd, dbId, tblId, 
partitionId)) {
+                LOG.info("replica {} of tablet {} has catchup with further 
repair watermark id {}",
+                        replica, tabletId, furtherRepairWatermarkTxnTd);
+                return true;
+            } else {
+                return false;
+            }
+        } catch (Exception e) {
+            LOG.warn("replica {} of tablet {} check catchup with further 
repair watermark id {} failed",
+                    replica, tabletId, furtherRepairWatermarkTxnTd, e);
+            return true;
+        }
+    }
+
     public void releaseResource(TabletScheduler tabletScheduler) {
         releaseResource(tabletScheduler, false);
     }
@@ -1104,25 +1152,7 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
             // change from prepare to committed or visible, this replica will 
be fall behind and be removed
             // in REDUNDANT detection.
             //
-            boolean isCatchup = false;
-            if (replica.getVersion() >= partition.getVisibleVersion() && 
replica.getLastFailedVersion() < 0) {
-                long furtherRepairWatermarkTxnTd = 
replica.getFurtherRepairWatermarkTxnTd();
-                if (furtherRepairWatermarkTxnTd < 0) {
-                    isCatchup = true;
-                } else {
-                    try {
-                        if 
(Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
-                                furtherRepairWatermarkTxnTd, dbId, tblId, 
partitionId)) {
-                            isCatchup = true;
-                            LOG.info("new replica {} of tablet {} has catchup 
with further repair watermark id {}",
-                                    replica, tabletId, 
furtherRepairWatermarkTxnTd);
-                        }
-                    } catch (Exception e) {
-                        isCatchup = true;
-                    }
-                }
-            }
-
+            boolean isCatchup = checkFurthurRepairFinish(replica, 
partition.getVisibleVersion());
             replica.incrFurtherRepairCount();
             if (isCatchup || replica.getLeftFurtherRepairCount() <= 0) {
                 replica.setNeedFurtherRepair(false);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 572199f941d..1d4592501f2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -46,6 +46,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.persist.ReplicaPersistInfo;
 import org.apache.doris.resource.Tag;
@@ -983,6 +984,7 @@ public class TabletScheduler extends MasterDaemon {
             boolean force, LoadStatisticForTag statistic) throws 
SchedException {
         Replica chosenReplica = null;
         double maxScore = 0;
+        long debugHighBeId = 
DebugPointUtil.getDebugParamOrDefault("FE.HIGH_LOAD_BE_ID", -1L);
         for (Replica replica : replicas) {
             BackendLoadStatistic beStatistic = 
statistic.getBackendLoadStatistic(replica.getBackendId());
             if (beStatistic == null) {
@@ -1007,6 +1009,11 @@ public class TabletScheduler extends MasterDaemon {
                 maxScore = loadScore;
                 chosenReplica = replica;
             }
+
+            if (debugHighBeId > 0 && replica.getBackendId() == debugHighBeId) {
+                chosenReplica = replica;
+                break;
+            }
         }
 
         if (chosenReplica != null) {
@@ -1535,6 +1542,16 @@ public class TabletScheduler extends MasterDaemon {
                     statusPair.second = tabletCtx.getPriority();
                 }
             }
+
+            if (statusPair.first == TabletStatus.NEED_FURTHER_REPAIR) {
+                // replica is just waiting for finishing txns before 
furtherRepairWatermarkTxnTd,
+                // no need to add it immediately
+                Replica replica = 
tablet.getReplicaByBackendId(tabletCtx.getDestBackendId());
+                if (replica != null && replica.getVersion() >= 
partition.getVisibleVersion()
+                        && replica.getLastFailedVersion() < 0) {
+                    return;
+                }
+            }
         } finally {
             tbl.readUnlock();
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java
index d9cedb2d533..da06232f0c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java
@@ -30,6 +30,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Use for manage debug points.
+ *
+ * usage example can see DebugPointUtilTest.java
+ *
  **/
 public class DebugPointUtil {
     private static final Logger LOG = 
LogManager.getLogger(DebugPointUtil.class);
@@ -109,11 +112,34 @@ public class DebugPointUtil {
         return debugPoint;
     }
 
+    // if not enable debug point or its params not contains `key`, then return 
`defaultValue`
+    // url: /api/debug_point/add/name?k1=v1&k2=v2&...
+    public static <E> E getDebugParamOrDefault(String debugPointName, String 
key, E defaultValue) {
+        DebugPoint debugPoint = getDebugPoint(debugPointName);
+
+        return debugPoint != null ? debugPoint.param(key, defaultValue) : 
defaultValue;
+    }
+
+    // url: /api/debug_point/add/name?value=v
+    public static <E> E getDebugParamOrDefault(String debugPointName, E 
defaultValue) {
+        return getDebugParamOrDefault(debugPointName, "value", defaultValue);
+    }
+
     public static void addDebugPoint(String name, DebugPoint debugPoint) {
         debugPoints.put(name, debugPoint);
         LOG.info("add debug point: name={}, params={}", name, 
debugPoint.params);
     }
 
+    public static void addDebugPoint(String name) {
+        addDebugPoint(name, new DebugPoint());
+    }
+
+    public static <E> void addDebugPointWithValue(String name, E value) {
+        DebugPoint debugPoint = new DebugPoint();
+        debugPoint.params.put("value", String.format("%s", value));
+        addDebugPoint(name, debugPoint);
+    }
+
     public static void removeDebugPoint(String name) {
         DebugPoint debugPoint = debugPoints.remove(name);
         LOG.info("remove debug point: name={}, exists={}", name, debugPoint != 
null);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java
index 5c6492b0635..0a68885bf26 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java
@@ -66,6 +66,12 @@ public class DebugPointUtilTest extends DorisHttpTestCase {
         Assert.assertTrue(debugPoint.param("v4", false));
         Assert.assertFalse(debugPoint.param("v5", false));
         Assert.assertEquals(123L, (long) debugPoint.param("v_no_exist", 123L));
+
+        Assert.assertEquals(1, (int) 
DebugPointUtil.getDebugParamOrDefault("dbug5", "v1", 0));
+        Assert.assertEquals(100, (int) 
DebugPointUtil.getDebugParamOrDefault("point_not_exists", "v1", 100));
+
+        sendRequest("/api/debug_point/add/dbug6?value=100");
+        Assert.assertEquals(100, (int) 
DebugPointUtil.getDebugParamOrDefault("dbug6", 0));
     }
 
     private void sendRequest(String uri) throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to