This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 6fe9d5d1af9 [improvement](sync version) fe sync version with be
(#25718)
6fe9d5d1af9 is described below
commit 6fe9d5d1af9b9d5475a04925aea5f9e365108255
Author: yujun <[email protected]>
AuthorDate: Sun Oct 22 00:47:56 2023 +0800
[improvement](sync version) fe sync version with be (#25718)
---
be/src/http/action/debug_point_action.cpp | 19 ++-
be/src/util/debug_points.cpp | 39 +++--
be/src/util/debug_points.h | 72 ++++++++-
be/test/util/debug_points_test.cpp | 40 +++++
.../java/org/apache/doris/catalog/Replica.java | 36 ++++-
.../apache/doris/catalog/TabletInvertedIndex.java | 22 ++-
.../org/apache/doris/clone/TabletSchedCtx.java | 7 +
.../apache/doris/common/util/DebugPointUtil.java | 95 +++++++++--
.../apache/doris/httpv2/rest/DebugPointAction.java | 25 ++-
.../java/org/apache/doris/master/MasterImpl.java | 9 +-
.../org/apache/doris/master/ReportHandler.java | 42 +++--
.../org/apache/doris/clone/RepairVersionTest.java | 177 +++++++++++++++++++++
.../doris/common/util/DebugPointUtilTest.java | 18 +++
.../apache/doris/utframe/TestWithFeService.java | 2 +-
14 files changed, 533 insertions(+), 70 deletions(-)
diff --git a/be/src/http/action/debug_point_action.cpp
b/be/src/http/action/debug_point_action.cpp
index 08b1e116b2b..04aa38efaa4 100644
--- a/be/src/http/action/debug_point_action.cpp
+++ b/be/src/http/action/debug_point_action.cpp
@@ -21,6 +21,7 @@
#include "http/http_channel.h"
#include "http/http_status.h"
#include "util/debug_points.h"
+#include "util/time.h"
namespace doris {
@@ -43,17 +44,16 @@ void BaseDebugPointAction::handle(HttpRequest* req) {
}
Status AddDebugPointAction::_handle(HttpRequest* req) {
- std::string debug_point = req->param("debug_point");
+ std::string name = req->param("debug_point");
std::string execute = req->param("execute");
std::string timeout = req->param("timeout");
- if (debug_point.empty()) {
+ if (name.empty()) {
return Status::InternalError("Empty debug point name");
}
- int64_t execute_limit = -1;
- int64_t timeout_second = -1;
+ auto debug_point = std::make_shared<DebugPoint>();
try {
if (!execute.empty()) {
- execute_limit = std::stol(execute);
+ debug_point->execute_limit = std::stol(execute);
}
} catch (const std::exception& e) {
return Status::InternalError("Invalid execute limit format, execute
{}, err {}", execute,
@@ -61,14 +61,19 @@ Status AddDebugPointAction::_handle(HttpRequest* req) {
}
try {
if (!timeout.empty()) {
- timeout_second = std::stol(timeout);
+ int64_t timeout_second = std::stol(timeout);
+ if (timeout_second > 0) {
+ debug_point->expire_ms = MonotonicMillis() + timeout_second *
MILLIS_PER_SEC;
+ }
}
} catch (const std::exception& e) {
return Status::InternalError("Invalid timeout format, timeout {}, err
{}", timeout,
e.what());
}
- DebugPoints::instance()->add(debug_point, execute_limit, timeout_second);
+ debug_point->params = *(req->params());
+
+ DebugPoints::instance()->add(name, debug_point);
return Status::OK();
}
diff --git a/be/src/util/debug_points.cpp b/be/src/util/debug_points.cpp
index 587f8c944a3..43bb39df9a4 100644
--- a/be/src/util/debug_points.cpp
+++ b/be/src/util/debug_points.cpp
@@ -30,37 +30,42 @@ DebugPoints* DebugPoints::instance() {
}
bool DebugPoints::is_enable(const std::string& name) {
+ return get_debug_point(name) != nullptr;
+}
+
+std::shared_ptr<DebugPoint> DebugPoints::get_debug_point(const std::string&
name) {
if (!config::enable_debug_points) {
- return false;
+ return nullptr;
}
auto map_ptr = std::atomic_load_explicit(&_debug_points,
std::memory_order_relaxed);
auto it = map_ptr->find(name);
if (it == map_ptr->end()) {
- return false;
+ return nullptr;
}
- auto& debug_point = *(it->second);
- if ((debug_point.expire_ms > 0 && MonotonicMillis() >=
debug_point.expire_ms) ||
- (debug_point.execute_limit > 0 &&
- debug_point.execute_num.fetch_add(1, std::memory_order_relaxed) >=
- debug_point.execute_limit)) {
+ auto debug_point = it->second;
+ if ((debug_point->expire_ms > 0 && MonotonicMillis() >=
debug_point->expire_ms) ||
+ (debug_point->execute_limit > 0 &&
+ debug_point->execute_num.fetch_add(1, std::memory_order_relaxed) >=
+ debug_point->execute_limit)) {
remove(name);
- return false;
+ return nullptr;
}
- return true;
+ return debug_point;
}
-void DebugPoints::add(const std::string& name, int64_t execute_limit, int64_t
timeout_second) {
- auto debug_point = std::make_shared<DebugPoint>();
- debug_point->execute_limit = execute_limit;
- if (timeout_second > 0) {
- debug_point->expire_ms = MonotonicMillis() + timeout_second *
MILLIS_PER_SEC;
- }
+void DebugPoints::add(const std::string& name, std::shared_ptr<DebugPoint>
debug_point) {
update([&](DebugPointMap& new_points) { new_points[name] = debug_point; });
- LOG(INFO) << "add debug point: name=" << name << ", execute=" <<
execute_limit
- << ", timeout=" << timeout_second;
+ std::ostringstream oss;
+ oss << "{";
+ for (auto [key, value] : debug_point->params) {
+ oss << key << " : " << value << ", ";
+ }
+ oss << "}";
+
+ LOG(INFO) << "add debug point: name=" << name << ", params=" << oss.str();
}
void DebugPoints::remove(const std::string& name) {
diff --git a/be/src/util/debug_points.h b/be/src/util/debug_points.h
index 704405689cc..1106a548f8d 100644
--- a/be/src/util/debug_points.h
+++ b/be/src/util/debug_points.h
@@ -18,19 +18,23 @@
#pragma once
#include <atomic>
+#include <boost/lexical_cast.hpp>
#include <functional>
#include <map>
#include <memory>
-#include <string>
+#include <type_traits>
#include "common/compiler_util.h"
#include "common/config.h"
+#include "fmt/format.h"
-#define DBUG_EXECUTE_IF(debug_point, code) \
- if (UNLIKELY(config::enable_debug_points)) { \
- if (DebugPoints::instance()->is_enable(debug_point)) { \
- code; \
- } \
+// more usage can see 'util/debug_points_test.cpp'
+#define DBUG_EXECUTE_IF(debug_point_name, code) \
+ if (UNLIKELY(config::enable_debug_points)) { \
+ auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
+ if (dp) { \
+ code; \
+ } \
}
namespace doris {
@@ -39,15 +43,69 @@ struct DebugPoint {
std::atomic<int64_t> execute_num {0};
int64_t execute_limit = -1;
int64_t expire_ms = -1;
+
+ std::map<std::string, std::string> params;
+
+ template <typename T>
+ T param(const std::string& key, T default_value = T()) {
+ auto it = params.find(key);
+ if (it == params.end()) {
+ return default_value;
+ }
+ if constexpr (std::is_same_v<T, bool>) {
+ if (it->second == "true") {
+ return true;
+ }
+ if (it->second == "false") {
+ return false;
+ }
+ return boost::lexical_cast<T>(it->second);
+ } else if constexpr (std::is_arithmetic_v<T>) {
+ return boost::lexical_cast<T>(it->second);
+ } else if constexpr (std::is_same_v<T, const char*>) {
+ return it->second.c_str();
+ } else {
+ static_assert(std::is_same_v<T, std::string>);
+ return it->second;
+ }
+ }
};
class DebugPoints {
public:
bool is_enable(const std::string& name);
- void add(const std::string& name, int64_t execute_limit, int64_t
timeout_second);
+ std::shared_ptr<DebugPoint> get_debug_point(const std::string& name);
void remove(const std::string& name);
void clear();
+ // if not enable debug point or its params not contains `key`, then return
`default_value`
+ // url: /api/debug_point/add/name?k1=v1&k2=v2&...
+ template <typename T>
+ T get_debug_param_or_default(const std::string& name, const std::string&
key,
+ const T& default_value) {
+ auto debug_point = get_debug_point(name);
+ return debug_point ? debug_point->param(key, default_value) :
default_value;
+ }
+
+ // url: /api/debug_point/add/name?value=v
+ template <typename T>
+ T get_debug_param_or_default(const std::string& name, const T&
default_value) {
+ return get_debug_param_or_default(name, "value", default_value);
+ }
+
+ void add(const std::string& name, std::shared_ptr<DebugPoint> debug_point);
+
+ // more 'add' functions for convenient use
+ void add(const std::string& name) { add(name,
std::make_shared<DebugPoint>()); }
+ void add_with_params(const std::string& name,
+ const std::map<std::string, std::string>& params) {
+ add(name, std::shared_ptr<DebugPoint>(new DebugPoint {.params =
params}));
+ }
+ template <typename T>
+ void add_with_value(const std::string& name, const T& value) {
+ add_with_params(name, {{"value", fmt::format("{}", value)}});
+ }
+
static DebugPoints* instance();
private:
diff --git a/be/test/util/debug_points_test.cpp
b/be/test/util/debug_points_test.cpp
index c2cf2bdedfd..76c4fd00781 100644
--- a/be/test/util/debug_points_test.cpp
+++ b/be/test/util/debug_points_test.cpp
@@ -54,6 +54,46 @@ TEST(DebugPointsTest, BaseTest) {
EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug4"));
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
EXPECT_FALSE(DebugPoints::instance()->is_enable("dbug4"));
+
+
POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/dbug5?v1=1&v2=a&v3=1.2&v4=true&v5=false");
+ EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug5"));
+ DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1, dp->param<int>("v1", 100)));
+ DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param<std::string>("v2")));
+ DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param("v2", std::string())));
+ DBUG_EXECUTE_IF("dbug5", EXPECT_STREQ("a", dp->param("v2", "b")));
+ DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1.2, dp->param<double>("v3")));
+ DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1.2, dp->param("v3", 0.0)));
+ DBUG_EXECUTE_IF("dbug5", EXPECT_TRUE(dp->param("v4", false)));
+ DBUG_EXECUTE_IF("dbug5", EXPECT_FALSE(dp->param("v5", false)));
+ DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(0L, dp->param<int64_t>("v_not_exist")));
+ DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(0L, dp->param("v_not_exist", 0L)));
+ DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(123, dp->param("v_not_exist", 123)));
+ DBUG_EXECUTE_IF("dbug5", EXPECT_STREQ("abcd", dp->param("v_not_exist",
"abcd")));
+
+ EXPECT_EQ(1.2,
DebugPoints::instance()->get_debug_param_or_default("dbug5", "v3", 0.0));
+ EXPECT_EQ(100,
+
DebugPoints::instance()->get_debug_param_or_default("point_not_exists", "k",
100));
+
+ POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/dbug6?value=567");
+ EXPECT_EQ(567,
DebugPoints::instance()->get_debug_param_or_default("dbug6", 0));
+}
+
+TEST(DebugPointsTest, AddTest) {
+ config::enable_debug_points = true;
+ DebugPoints::instance()->clear();
+
+ DebugPoints::instance()->add("dbug1");
+ EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug1"));
+
+ DebugPoints::instance()->add_with_params("dbug2", {{"k1", "100"}});
+ EXPECT_EQ(100,
DebugPoints::instance()->get_debug_param_or_default("dbug2", "k1", 0));
+
+ DebugPoints::instance()->add_with_value("dbug3", 567);
+ EXPECT_EQ(567,
DebugPoints::instance()->get_debug_param_or_default("dbug3", 567));
+
+ DebugPoints::instance()->add_with_value("dbug4", "hello");
+ EXPECT_EQ("hello",
+
DebugPoints::instance()->get_debug_param_or_default<std::string>("dbug4", ""));
}
} // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index e55eab89392..631f2ebaf3b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -20,6 +20,7 @@ package org.apache.doris.catalog;
import org.apache.doris.common.Config;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.thrift.TUniqueId;
import com.google.gson.annotations.SerializedName;
@@ -114,6 +115,14 @@ public class Replica implements Writable {
private TUniqueId cooldownMetaId;
private long cooldownTerm = -1;
+ // A replica version should increase monotonically,
+ // but backend may missing some versions due to disk failure or bugs.
+ // FE should found these and mark the replica as missing versions.
+ // If backend's report version < fe version, record the backend's report
version as `regressiveVersion`,
+ // and if time exceed 5min, fe should mark this replica as missing
versions.
+ private long regressiveVersion = -1;
+ private long regressiveVersionTimestamp = 0;
+
/*
* This can happen when this replica is created by a balance clone task,
and
* when task finished, the version of this replica is behind the
partition's visible version.
@@ -435,9 +444,9 @@ public class Replica implements Writable {
if (lastFailedVersion != this.lastFailedVersion) {
// Case 2:
- if (lastFailedVersion > this.lastFailedVersion) {
+ if (lastFailedVersion > this.lastFailedVersion ||
lastFailedVersion < 0) {
this.lastFailedVersion = lastFailedVersion;
- this.lastFailedTimestamp = System.currentTimeMillis();
+ this.lastFailedTimestamp = lastFailedVersion > 0 ?
System.currentTimeMillis() : -1L;
}
this.lastSuccessVersion = this.version;
@@ -506,10 +515,6 @@ public class Replica implements Writable {
return true;
}
- public void setLastFailedVersion(long lastFailedVersion) {
- this.lastFailedVersion = lastFailedVersion;
- }
-
public void setState(ReplicaState replicaState) {
this.state = replicaState;
}
@@ -534,6 +539,25 @@ public class Replica implements Writable {
this.versionCount = versionCount;
}
+ public boolean checkVersionRegressive(long newVersion) {
+ if (newVersion >= version) {
+ regressiveVersion = -1;
+ regressiveVersionTimestamp = -1;
+ return false;
+ }
+
+ if (DebugPointUtil.isEnable("Replica.regressive_version_immediately"))
{
+ return true;
+ }
+
+ if (newVersion != regressiveVersion) {
+ regressiveVersion = newVersion;
+ regressiveVersionTimestamp = System.currentTimeMillis();
+ }
+
+ return System.currentTimeMillis() - regressiveVersionTimestamp >= 5 *
60 * 1000L;
+ }
+
@Override
public String toString() {
StringBuilder strBuffer = new StringBuilder("[replicaId=");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index 2b601f9f030..a2d5983aac4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -390,10 +390,22 @@ public class TabletInvertedIndex {
if (backendTabletInfo.getVersion() > versionInFe) {
// backend replica's version is larger or newer than replica in
FE, sync it.
return true;
- } else if (versionInFe == backendTabletInfo.getVersion() &&
replicaInFe.isBad()) {
+ } else if (versionInFe == backendTabletInfo.getVersion()) {
// backend replica's version is equal to replica in FE, but
replica in FE is bad,
// while backend replica is good, sync it
- return true;
+ if (replicaInFe.isBad()) {
+ return true;
+ }
+
+ // FE' s replica last failed version > partition's committed
version
+ // this can be occur when be report miss version, fe will set last
failed version = visible version + 1
+ // then last failed version may greater than partition's committed
version
+ //
+ // But here cannot got variable partition, we just check
lastFailedVersion = version + 1,
+ // In ReportHandler.sync, we will check if last failed version >
partition's committed version again.
+ if (replicaInFe.getLastFailedVersion() == versionInFe + 1) {
+ return true;
+ }
}
return false;
@@ -501,6 +513,12 @@ public class TabletInvertedIndex {
// so we only return true if version_miss is true.
return true;
}
+
+ // backend versions regressive due to bugs
+ if
(replicaInFe.checkVersionRegressive(backendTabletInfo.getVersion())) {
+ return true;
+ }
+
return false;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index b4667f80696..3f52210e8f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -1074,6 +1074,13 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
replica.updateVersionInfo(reportedTablet.getVersion(),
reportedTablet.getDataSize(),
reportedTablet.getDataSize(),
reportedTablet.getRowCount());
+ if (replica.getLastFailedVersion() >
partition.getCommittedVersion()
+ && reportedTablet.getVersion() >=
partition.getCommittedVersion()
+ //&& !(reportedTablet.isSetVersionMiss() &&
reportedTablet.isVersionMiss()
+ && !(reportedTablet.isSetUsed() &&
!reportedTablet.isUsed())) {
+ LOG.info("change replica {} of tablet {} 's last failed
version to -1", replica, tabletId);
+ replica.updateLastFailedVersion(-1L);
+ }
if (reportedTablet.isSetPathHash()) {
replica.setPathHash(reportedTablet.getPathHash());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java
index aab9b8f2ba6..da06232f0c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java
@@ -19,6 +19,8 @@ package org.apache.doris.common.util;
import org.apache.doris.common.Config;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -28,45 +30,114 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* Use for manage debug points.
+ *
+ * usage example can see DebugPointUtilTest.java
+ *
**/
public class DebugPointUtil {
private static final Logger LOG =
LogManager.getLogger(DebugPointUtil.class);
private static final Map<String, DebugPoint> debugPoints = new
ConcurrentHashMap<>();
- private static class DebugPoint {
+ public static class DebugPoint {
public AtomicInteger executeNum = new AtomicInteger(0);
public int executeLimit = -1;
public long expireTime = -1;
+
+ // params
+ public Map<String, String> params = Maps.newHashMap();
+
+ public <E> E param(String key, E defaultValue) {
+ Preconditions.checkState(defaultValue != null);
+
+ String value = params.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ if (defaultValue instanceof Boolean) {
+ return (E) Boolean.valueOf(value);
+ }
+ if (defaultValue instanceof Byte) {
+ return (E) Byte.valueOf(value);
+ }
+ if (defaultValue instanceof Character) {
+ Preconditions.checkState(value.length() == 1);
+ return (E) Character.valueOf(value.charAt(0));
+ }
+ if (defaultValue instanceof Short) {
+ return (E) Short.valueOf(value);
+ }
+ if (defaultValue instanceof Integer) {
+ return (E) Integer.valueOf(value);
+ }
+ if (defaultValue instanceof Long) {
+ return (E) Long.valueOf(value);
+ }
+ if (defaultValue instanceof Float) {
+ return (E) Float.valueOf(value);
+ }
+ if (defaultValue instanceof Double) {
+ return (E) Double.valueOf(value);
+ }
+ if (defaultValue instanceof String) {
+ return (E) value;
+ }
+
+ Preconditions.checkState(false, "Can not convert with default
value=" + defaultValue);
+
+ return defaultValue;
+ }
}
public static boolean isEnable(String debugPointName) {
+ return getDebugPoint(debugPointName) != null;
+ }
+
+ public static DebugPoint getDebugPoint(String debugPointName) {
if (!Config.enable_debug_points) {
- return false;
+ return null;
}
DebugPoint debugPoint = debugPoints.get(debugPointName);
if (debugPoint == null) {
- return false;
+ return null;
}
if ((debugPoint.expireTime > 0 && System.currentTimeMillis() >=
debugPoint.expireTime)
|| (debugPoint.executeLimit > 0 &&
debugPoint.executeNum.incrementAndGet() > debugPoint.executeLimit)) {
debugPoints.remove(debugPointName);
- return false;
+ return null;
}
- return true;
+ return debugPoint;
}
- public static void addDebugPoint(String name, int executeLimit, long
timeoutSecond) {
- DebugPoint debugPoint = new DebugPoint();
- debugPoint.executeLimit = executeLimit;
- if (timeoutSecond > 0) {
- debugPoint.expireTime = System.currentTimeMillis() + timeoutSecond
* 1000;
- }
+ // if not enable debug point or its params not contains `key`, then return
`defaultValue`
+ // url: /api/debug_point/add/name?k1=v1&k2=v2&...
+ public static <E> E getDebugParamOrDefault(String debugPointName, String
key, E defaultValue) {
+ DebugPoint debugPoint = getDebugPoint(debugPointName);
+
+ return debugPoint != null ? debugPoint.param(key, defaultValue) :
defaultValue;
+ }
+
+ // url: /api/debug_point/add/name?value=v
+ public static <E> E getDebugParamOrDefault(String debugPointName, E
defaultValue) {
+ return getDebugParamOrDefault(debugPointName, "value", defaultValue);
+ }
+
+ public static void addDebugPoint(String name, DebugPoint debugPoint) {
debugPoints.put(name, debugPoint);
- LOG.info("add debug point: name={}, execute={}, timeout seconds={}",
name, executeLimit, timeoutSecond);
+ LOG.info("add debug point: name={}, params={}", name,
debugPoint.params);
+ }
+
+ public static void addDebugPoint(String name) {
+ addDebugPoint(name, new DebugPoint());
+ }
+
+ public static <E> void addDebugPointWithValue(String name, E value) {
+ DebugPoint debugPoint = new DebugPoint();
+ debugPoint.params.put("value", String.format("%s", value));
+ addDebugPoint(name, debugPoint);
}
public static void removeDebugPoint(String name) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java
index 25ee7a5d0a6..8c102fd0ae4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java
@@ -19,6 +19,7 @@ package org.apache.doris.httpv2.rest;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
@@ -40,7 +41,7 @@ import javax.servlet.http.HttpServletResponse;
public class DebugPointAction extends RestBaseController {
@RequestMapping(path = "/api/debug_point/add/{debugPoint}", method =
RequestMethod.POST)
- protected Object addDebugPoint(@PathVariable("debugPoint") String
debugPoint,
+ protected Object addDebugPoint(@PathVariable("debugPoint") String name,
@RequestParam(name = "execute", required = false, defaultValue =
"") String execute,
@RequestParam(name = "timeout", required = false, defaultValue =
"") String timeout,
HttpServletRequest request, HttpServletResponse response) {
@@ -50,28 +51,38 @@ public class DebugPointAction extends RestBaseController {
}
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(),
PrivPredicate.ADMIN);
- if (Strings.isNullOrEmpty(debugPoint)) {
+ if (Strings.isNullOrEmpty(name)) {
return ResponseEntityBuilder.badRequest("Empty debug point name.");
}
- int executeLimit = -1;
+
+ DebugPoint debugPoint = new DebugPoint();
if (!Strings.isNullOrEmpty(execute)) {
try {
- executeLimit = Integer.valueOf(execute);
+ debugPoint.executeLimit = Integer.valueOf(execute);
} catch (Exception e) {
return ResponseEntityBuilder.badRequest(
"Invalid execute format: " + execute + ", err " +
e.getMessage());
}
}
- long timeoutSeconds = -1;
if (!Strings.isNullOrEmpty(timeout)) {
try {
- timeoutSeconds = Long.valueOf(timeout);
+ long timeoutSeconds = Long.valueOf(timeout);
+ if (timeoutSeconds > 0) {
+ debugPoint.expireTime = System.currentTimeMillis() +
timeoutSeconds * 1000;
+ }
} catch (Exception e) {
return ResponseEntityBuilder.badRequest(
"Invalid timeout format: " + timeout + ", err " +
e.getMessage());
}
}
- DebugPointUtil.addDebugPoint(debugPoint, executeLimit, timeoutSeconds);
+ request.getParameterMap().forEach((key, values) -> {
+ if (values != null && values.length > 0) {
+ debugPoint.params.put(key, values[0]);
+ }
+ });
+
+ DebugPointUtil.addDebugPoint(name, debugPoint);
+
return ResponseEntityBuilder.ok();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 2833eff5f3d..64b771663b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -192,7 +192,7 @@ public class MasterImpl {
finishRecoverTablet(task);
break;
case ALTER:
- finishAlterTask(task);
+ finishAlterTask(task, request);
break;
case ALTER_INVERTED_INDEX:
finishAlterInvertedIndexTask(task, request);
@@ -575,7 +575,7 @@ public class MasterImpl {
return reportHandler.handleReport(request);
}
- private void finishAlterTask(AgentTask task) {
+ private void finishAlterTask(AgentTask task, TFinishTaskRequest request) {
AlterReplicaTask alterTask = (AlterReplicaTask) task;
try {
if (alterTask.getJobType() == JobType.ROLLUP) {
@@ -584,6 +584,11 @@ public class MasterImpl {
Env.getCurrentEnv().getSchemaChangeHandler().handleFinishAlterTask(alterTask);
}
alterTask.setFinished(true);
+ if (request.isSetReportVersion()) {
+ long reportVersion = request.getReportVersion();
+ Env.getCurrentSystemInfo().updateBackendReportVersion(
+ task.getBackendId(), reportVersion, task.getDbId(),
task.getTableId());
+ }
} catch (MetaNotFoundException e) {
LOG.warn("failed to handle finish alter task: {}, {}",
task.getSignature(), e.getMessage());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index bfe1bb0a9e2..2de0c4e5050 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -403,7 +403,8 @@ public class ReportHandler extends Daemon {
}
}
- private static void tabletReport(long backendId, Map<Long, TTablet>
backendTablets, long backendReportVersion) {
+ // public for fe ut
+ public static void tabletReport(long backendId, Map<Long, TTablet>
backendTablets, long backendReportVersion) {
long start = System.currentTimeMillis();
LOG.info("backend[{}] reports {} tablet(s). report version: {}",
backendId, backendTablets.size(), backendReportVersion);
@@ -607,6 +608,11 @@ public class ReportHandler extends Daemon {
if (olapTable == null || !olapTable.writeLockIfExist()) {
continue;
}
+
+ if (backendReportVersion <
Env.getCurrentSystemInfo().getBackendReportVersion(backendId)) {
+ break;
+ }
+
try {
long partitionId = tabletMeta.getPartitionId();
Partition partition = olapTable.getPartition(partitionId);
@@ -660,14 +666,25 @@ public class ReportHandler extends Daemon {
continue;
}
- if (metaVersion < backendVersion
- || (metaVersion == backendVersion &&
replica.isBad())) {
-
- if (backendReportVersion <
Env.getCurrentSystemInfo()
- .getBackendReportVersion(backendId)) {
- continue;
+ boolean needSync = false;
+ if (metaVersion < backendVersion) {
+ needSync = true;
+ } else if (metaVersion == backendVersion) {
+ if (replica.isBad()) {
+ needSync = true;
}
+ if (replica.getVersion() >=
partition.getCommittedVersion()
+ && replica.getLastFailedVersion() >
partition.getCommittedVersion()) {
+ LOG.info("sync replica {} of tablet {} in
backend {} in db {}. replica last failed"
+ + " version change to -1 because last
failed version > replica's committed"
+ + " version {}",
+ replica, tabletId, backendId, dbId,
partition.getCommittedVersion());
+ replica.updateLastFailedVersion(-1L);
+ needSync = true;
+ }
+ }
+ if (needSync) {
// happens when
// 1. PUSH finished in BE but failed or not yet
report to FE
// 2. repair for VERSION_INCOMPLETE finished in
BE, but failed or not yet report to FE
@@ -1048,18 +1065,25 @@ public class ReportHandler extends Daemon {
break;
}
- if (tTabletInfo.isSetVersionMiss() &&
tTabletInfo.isVersionMiss()) {
+ if ((tTabletInfo.isSetVersionMiss() &&
tTabletInfo.isVersionMiss())
+ ||
replica.checkVersionRegressive(tTabletInfo.getVersion())) {
// If the origin last failed version is larger
than 0, not change it.
// Otherwise, we set last failed version to
replica'version + 1.
// Because last failed version should always
larger than replica's version.
long newLastFailedVersion =
replica.getLastFailedVersion();
if (newLastFailedVersion < 0) {
newLastFailedVersion =
replica.getVersion() + 1;
+
replica.updateLastFailedVersion(newLastFailedVersion);
+ LOG.warn("set missing version for replica
{} of tablet {} on backend {}, "
+ + "version in fe {}, version in be
{}, be missing {}",
+ replica.getId(), tabletId,
backendId, replica.getVersion(),
+ tTabletInfo.getVersion(),
tTabletInfo.isVersionMiss());
}
-
replica.updateLastFailedVersion(newLastFailedVersion);
backendReplicasInfo.addMissingVersionReplica(tabletId, newLastFailedVersion);
break;
}
+
+ break;
}
}
} finally {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
new file mode 100644
index 00000000000..f56a55d5874
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
+import org.apache.doris.master.ReportHandler;
+import org.apache.doris.thrift.TTablet;
+import org.apache.doris.thrift.TTabletInfo;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+public class RepairVersionTest extends TestWithFeService {
+ private class TableInfo {
+ Partition partition;
+ Tablet tablet;
+ Replica replica;
+ }
+
+ @Override
+ protected void beforeCreatingConnectContext() throws Exception {
+ Config.enable_debug_points = true;
+ Config.disable_balance = true;
+ Config.disable_tablet_scheduler = true;
+ Config.allow_replica_on_same_host = true;
+ FeConstants.tablet_checker_interval_ms = 100;
+ FeConstants.tablet_schedule_interval_ms = 100;
+ }
+
+ @Override
+ protected void runBeforeAll() throws Exception {
+ createDatabase("test");
+ }
+
+ @Override
+ protected int backendNum() {
+ return 2;
+ }
+
+ @Test
+ public void testRepairLastFailedVersionByClone() throws Exception {
+ TableInfo info =
prepareTableForTest("tbl_repair_last_fail_version_by_clone");
+ Partition partition = info.partition;
+ Replica replica = info.replica;
+
+ replica.updateLastFailedVersion(replica.getVersion() + 1);
+ Assertions.assertEquals(partition.getCommittedVersion() + 1,
replica.getLastFailedVersion());
+
+ Config.disable_tablet_scheduler = false;
+ Thread.sleep(1000);
+ Config.disable_tablet_scheduler = true;
+
+ Assertions.assertEquals(partition.getVisibleVersion(),
replica.getVersion());
+ Assertions.assertEquals(-1L, replica.getLastFailedVersion());
+ }
+
+ @Test
+ public void testRepairLastFailedVersionByReport() throws Exception {
+ TableInfo info =
prepareTableForTest("tbl_repair_last_fail_version_by_report");
+ Partition partition = info.partition;
+ Tablet tablet = info.tablet;
+ Replica replica = info.replica;
+
+ replica.updateLastFailedVersion(replica.getVersion() + 1);
+ Assertions.assertEquals(partition.getCommittedVersion() + 1,
replica.getLastFailedVersion());
+
+ TTabletInfo tTabletInfo = new TTabletInfo();
+ tTabletInfo.setTabletId(tablet.getId());
+ tTabletInfo.setSchemaHash(replica.getSchemaHash());
+ tTabletInfo.setVersion(replica.getVersion());
+ tTabletInfo.setPathHash(replica.getPathHash());
+ tTabletInfo.setPartitionId(partition.getId());
+ tTabletInfo.setReplicaId(replica.getId());
+
+ TTablet tTablet = new TTablet();
+ tTablet.addToTabletInfos(tTabletInfo);
+ Map<Long, TTablet> tablets = Maps.newHashMap();
+ tablets.put(tablet.getId(), tTablet);
+ Assertions.assertEquals(partition.getVisibleVersion(),
replica.getVersion());
+
+ ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L);
+
+ Assertions.assertEquals(partition.getVisibleVersion(),
replica.getVersion());
+ Assertions.assertEquals(-1L, replica.getLastFailedVersion());
+ }
+
+ @Test
+ public void testVersionRegressive() throws Exception {
+ TableInfo info = prepareTableForTest("tbl_version_regressive");
+ Partition partition = info.partition;
+ Tablet tablet = info.tablet;
+ Replica replica = info.replica;
+
+ Assertions.assertEquals(partition.getVisibleVersion(),
replica.getVersion());
+ Assertions.assertEquals(-1L, replica.getLastFailedVersion());
+ Assertions.assertTrue(replica.getVersion() > 1L);
+
+ TTabletInfo tTabletInfo = new TTabletInfo();
+ tTabletInfo.setTabletId(tablet.getId());
+ tTabletInfo.setSchemaHash(replica.getSchemaHash());
+ tTabletInfo.setVersion(1L); // be report version = 1 which less than
fe version
+ tTabletInfo.setPathHash(replica.getPathHash());
+ tTabletInfo.setPartitionId(partition.getId());
+ tTabletInfo.setReplicaId(replica.getId());
+
+ TTablet tTablet = new TTablet();
+ tTablet.addToTabletInfos(tTabletInfo);
+ Map<Long, TTablet> tablets = Maps.newHashMap();
+ tablets.put(tablet.getId(), tTablet);
+
+ ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L);
+ Assertions.assertEquals(-1L, replica.getLastFailedVersion());
+
+ DebugPointUtil.addDebugPoint("Replica.regressive_version_immediately",
new DebugPoint());
+ ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L);
+ Assertions.assertEquals(replica.getVersion() + 1,
replica.getLastFailedVersion());
+
+ Assertions.assertEquals(partition.getVisibleVersion(),
replica.getVersion());
+ }
+
+ private TableInfo prepareTableForTest(String tableName) throws Exception {
+ createTable("CREATE TABLE test." + tableName + " (k INT) DISTRIBUTED
BY HASH(k) "
+ + " BUCKETS 1 PROPERTIES ( \"replication_num\" = \"2\" )");
+
+ Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:test");
+ OlapTable tbl = (OlapTable) db.getTableOrMetaException(tableName);
+ Assertions.assertNotNull(tbl);
+ Partition partition = tbl.getPartitions().iterator().next();
+ Tablet tablet =
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next()
+ .getTablets().iterator().next();
+
+ long visibleVersion = 2L;
+ partition.updateVisibleVersion(visibleVersion);
+ partition.setNextVersion(visibleVersion + 1);
+ tablet.getReplicas().forEach(replica ->
replica.updateVersionInfo(visibleVersion, 1L, 1L, 1L));
+
+ Replica replica = tablet.getReplicas().iterator().next();
+ Assertions.assertEquals(visibleVersion, replica.getVersion());
+ Assertions.assertEquals(-1L, replica.getLastFailedVersion());
+
+ TableInfo info = new TableInfo();
+ info.partition = partition;
+ info.tablet = tablet;
+ info.replica = replica;
+
+ return info;
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java
index 2845cec9225..0a68885bf26 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java
@@ -18,6 +18,7 @@
package org.apache.doris.common.util;
import org.apache.doris.common.Config;
+import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
import org.apache.doris.http.DorisHttpTestCase;
import okhttp3.Request;
@@ -54,6 +55,23 @@ public class DebugPointUtilTest extends DorisHttpTestCase {
Assert.assertTrue(DebugPointUtil.isEnable("dbug4"));
Thread.sleep(1000);
Assert.assertFalse(DebugPointUtil.isEnable("dbug4"));
+
+
sendRequest("/api/debug_point/add/dbug5?v1=1&v2=a&v3=1.2&v4=true&v5=false");
+ Assert.assertTrue(DebugPointUtil.isEnable("dbug5"));
+ DebugPoint debugPoint = DebugPointUtil.getDebugPoint("dbug5");
+ Assert.assertNotNull(debugPoint);
+ Assert.assertEquals(1, (int) debugPoint.param("v1", 0));
+ Assert.assertEquals("a", debugPoint.param("v2", ""));
+ Assert.assertEquals(1.2, debugPoint.param("v3", 0.0), 1e-6);
+ Assert.assertTrue(debugPoint.param("v4", false));
+ Assert.assertFalse(debugPoint.param("v5", false));
+ Assert.assertEquals(123L, (long) debugPoint.param("v_no_exist", 123L));
+
+ Assert.assertEquals(1, (int)
DebugPointUtil.getDebugParamOrDefault("dbug5", "v1", 0));
+ Assert.assertEquals(100, (int)
DebugPointUtil.getDebugParamOrDefault("point_not_exists", "v1", 100));
+
+ sendRequest("/api/debug_point/add/dbug6?value=100");
+ Assert.assertEquals(100, (int)
DebugPointUtil.getDebugParamOrDefault("dbug6", 0));
}
private void sendRequest(String uri) throws Exception {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index a17fcdf72bc..ac4fa2660db 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -392,7 +392,7 @@ public abstract class TestWithFeService {
InterruptedException {
int feRpcPort = startFEServer(runningDir);
List<Backend> bes = Lists.newArrayList();
- System.out.println("start create backend");
+ System.out.println("start create backend, backend num " + backendNum);
for (int i = 0; i < backendNum; i++) {
bes.add(createBackend("127.0.0.1", feRpcPort));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]