This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 895754bf40e [feat](cloud) Add injection point http api for ms (#41982)
895754bf40e is described below
commit 895754bf40eb742a98e754e804211361e1384c82
Author: Lei Zhang <[email protected]>
AuthorDate: Tue Oct 29 23:14:59 2024 +0800
[feat](cloud) Add injection point http api for ms (#41982)
---
build.sh | 1 +
cloud/CMakeLists.txt | 4 +
cloud/src/meta-service/CMakeLists.txt | 1 +
cloud/src/meta-service/injection_point_http.cpp | 226 +++++++++++++++++++++
cloud/src/meta-service/meta_service.cpp | 2 +-
cloud/src/meta-service/meta_service_http.cpp | 4 +
cloud/src/meta-service/meta_service_txn.cpp | 2 +-
cloud/src/meta-service/txn_lazy_committer.cpp | 5 +-
common/cpp/sync_point.h | 2 +-
.../apache/doris/cloud/catalog/CloudPartition.java | 14 +-
10 files changed, 248 insertions(+), 13 deletions(-)
diff --git a/build.sh b/build.sh
index 1da5df76bb2..03298829823 100755
--- a/build.sh
+++ b/build.sh
@@ -631,6 +631,7 @@ if [[ "${BUILD_CLOUD}" -eq 1 ]]; then
-DCMAKE_MAKE_PROGRAM="${MAKE_PROGRAM}" \
-DCMAKE_EXPORT_COMPILE_COMMANDS=ON \
-DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" \
+ -DENABLE_INJECTION_POINT="${ENABLE_INJECTION_POINT}" \
-DMAKE_TEST=OFF \
"${CMAKE_USE_CCACHE}" \
-DUSE_LIBCPP="${USE_LIBCPP}" \
diff --git a/cloud/CMakeLists.txt b/cloud/CMakeLists.txt
index 32e60f7bfb5..f8084acf77b 100644
--- a/cloud/CMakeLists.txt
+++ b/cloud/CMakeLists.txt
@@ -411,6 +411,10 @@ if (${MAKE_TEST} STREQUAL "ON")
add_definitions(-DBE_TEST)
endif ()
+if (ENABLE_INJECTION_POINT)
+ add_definitions(-DENABLE_INJECTION_POINT)
+endif()
+
# Add libs if needed, download to current dir -- ${BUILD_DIR}
set(FDB_LIB "fdb_lib_7_1_23.tar.xz")
file(GLOB RELEASE_FILE_LIST LIST_DIRECTORIES false "/etc/*release*")
diff --git a/cloud/src/meta-service/CMakeLists.txt
b/cloud/src/meta-service/CMakeLists.txt
index c7c4887a068..d11f87e7fa2 100644
--- a/cloud/src/meta-service/CMakeLists.txt
+++ b/cloud/src/meta-service/CMakeLists.txt
@@ -12,6 +12,7 @@ add_library(MetaService
meta_server.cpp
meta_service.cpp
meta_service_http.cpp
+ injection_point_http.cpp
meta_service_job.cpp
meta_service_resource.cpp
meta_service_schema.cpp
diff --git a/cloud/src/meta-service/injection_point_http.cpp
b/cloud/src/meta-service/injection_point_http.cpp
new file mode 100644
index 00000000000..80d1bcfdf2e
--- /dev/null
+++ b/cloud/src/meta-service/injection_point_http.cpp
@@ -0,0 +1,226 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <fmt/format.h>
+#include <gen_cpp/cloud.pb.h>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "cpp/sync_point.h"
+#include "meta-service/keys.h"
+#include "meta-service/meta_service_helper.h"
+#include "meta-service/txn_kv.h"
+#include "meta-service/txn_kv_error.h"
+#include "meta_service.h"
+#include "meta_service_http.h"
+
+namespace doris::cloud {
+
+std::map<std::string, std::function<void()>> suite_map;
+std::once_flag register_suites_once;
+
+inline std::default_random_engine make_random_engine() {
+ return std::default_random_engine(
+
static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
+}
+
+static void register_suites() {
+ suite_map.emplace("test_txn_lazy_commit", [] {
+ auto sp = SyncPoint::get_instance();
+
+
sp->set_call_back("commit_txn_immediately::advance_last_pending_txn_id",
[&](auto&& args) {
+ std::default_random_engine rng = make_random_engine();
+ std::uniform_int_distribution<uint32_t> u(100, 1000);
+ uint32_t duration_ms = u(rng);
+ LOG(INFO) << "commit_txn_immediately::advance_last_pending_txn_id
sleep " << duration_ms
+ << " ms";
+
std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
+ });
+
+ sp->set_call_back("commit_txn_eventually::txn_lazy_committer_submit",
[&](auto&& args) {
+ std::default_random_engine rng = make_random_engine();
+ std::uniform_int_distribution<uint32_t> u(100, 1000);
+ uint32_t duration_ms = u(rng);
+ LOG(INFO) << "commit_txn_eventually::txn_lazy_committer_submit
sleep " << duration_ms
+ << " ms";
+
std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
+ });
+
+ sp->set_call_back("commit_txn_eventually::txn_lazy_committer_wait",
[&](auto&& args) {
+ std::default_random_engine rng = make_random_engine();
+ std::uniform_int_distribution<uint32_t> u(100, 1000);
+ uint32_t duration_ms = u(rng);
+ LOG(INFO) << "commit_txn_eventually::txn_lazy_committer_wait sleep
" << duration_ms
+ << " ms";
+
std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
+ });
+
+ sp->set_call_back("convert_tmp_rowsets::before_commit", [&](auto&&
args) {
+ std::default_random_engine rng = make_random_engine();
+ std::uniform_int_distribution<uint32_t> u(1, 50);
+ uint32_t duration_ms = u(rng);
+
std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
+ LOG(INFO) << "convert_tmp_rowsets::before_commit sleep " <<
duration_ms << " ms";
+ if (duration_ms <= 25) {
+ MetaServiceCode* code =
try_any_cast<MetaServiceCode*>(args[0]);
+ *code = MetaServiceCode::KV_TXN_CONFLICT;
+ bool* pred = try_any_cast<bool*>(args.back());
+ *pred = true;
+ LOG(INFO) << "convert_tmp_rowsets::before_commit
random_value=" << duration_ms
+ << " inject kv txn conflict";
+ }
+ });
+ });
+}
+
+HttpResponse set_sleep(const std::string& point, const brpc::URI& uri) {
+ std::string duration_str(http_query(uri, "duration"));
+ int64_t duration = 0;
+ try {
+ duration = std::stol(duration_str);
+ } catch (const std::exception& e) {
+ auto msg = fmt::format("invalid duration:{}", duration_str);
+ LOG(WARNING) << msg;
+ return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
+ }
+
+ auto sp = SyncPoint::get_instance();
+ sp->set_call_back(point, [point, duration](auto&& args) {
+ LOG(INFO) << "injection point hit, point=" << point << " sleep ms=" <<
duration;
+ std::this_thread::sleep_for(std::chrono::milliseconds(duration));
+ });
+ return http_json_reply(MetaServiceCode::OK, "OK");
+}
+
+HttpResponse set_return(const std::string& point, const brpc::URI& uri) {
+ auto sp = SyncPoint::get_instance();
+ sp->set_call_back(point, [point](auto&& args) {
+ try {
+ LOG(INFO) << "injection point hit, point=" << point << " return
void";
+ auto pred = try_any_cast<bool*>(args.back());
+ *pred = true;
+ } catch (const std::bad_any_cast& e) {
+ LOG(ERROR) << "failed to process `return` e:" << e.what();
+ }
+ });
+
+ return http_json_reply(MetaServiceCode::OK, "OK");
+}
+
+HttpResponse handle_set(const brpc::URI& uri) {
+ const std::string point(http_query(uri, "name"));
+ if (point.empty()) {
+ return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty point
name");
+ }
+
+ const std::string behavior(http_query(uri, "behavior"));
+ if (behavior.empty()) {
+ return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty
behavior");
+ }
+ if (behavior == "sleep") {
+ return set_sleep(point, uri);
+ } else if (behavior == "return") {
+ return set_return(point, uri);
+ }
+
+ return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown
behavior: " + behavior);
+}
+
+HttpResponse handle_clear(const brpc::URI& uri) {
+ const std::string point(http_query(uri, "name"));
+ auto* sp = SyncPoint::get_instance();
+ LOG(INFO) << "clear injection point : " << (point.empty() ? "(all points)"
: point);
+ if (point.empty()) {
+ // If point name is emtpy, clear all
+ sp->clear_all_call_backs();
+ return http_json_reply(MetaServiceCode::OK, "OK");
+ }
+ sp->clear_call_back(point);
+ return http_json_reply(MetaServiceCode::OK, "OK");
+}
+
+HttpResponse handle_apply_suite(const brpc::URI& uri) {
+ const std::string suite(http_query(uri, "name"));
+ if (suite.empty()) {
+ return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty suite
name");
+ }
+
+ std::call_once(register_suites_once, register_suites);
+ if (auto it = suite_map.find(suite); it != suite_map.end()) {
+ it->second(); // set injection callbacks
+ return http_json_reply(MetaServiceCode::OK, "OK apply suite " + suite
+ "\n");
+ }
+
+ return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown suite:
" + suite + "\n");
+}
+
+HttpResponse handle_enable(const brpc::URI& uri) {
+ SyncPoint::get_instance()->enable_processing();
+ return http_json_reply(MetaServiceCode::OK, "OK");
+}
+
+HttpResponse handle_disable(const brpc::URI& uri) {
+ SyncPoint::get_instance()->disable_processing();
+ return http_json_reply(MetaServiceCode::OK, "OK");
+}
+
+//
+// enable/disable injection point
+// ```
+// curl
"ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=enable"
+// curl
"ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=disable"
+// ```
+
+// clear all injection points
+// ```
+// curl
"ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=clear"
+// ```
+
+// apply/activate specific suite with registered action, see
`register_suites()` for more details
+// ```
+// curl
"ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=apply_suite&name=${suite_name}"
+// ```
+
+// ```
+// curl
"ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set
+// &name=${injection_point_name}&behavior=sleep&duration=${x_millsec}" #
sleep x millisecs
+
+// curl
"ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set
+// &name=${injection_point_name}&behavior=return" # return void
+// ```
+
+HttpResponse process_injection_point(MetaServiceImpl* service,
brpc::Controller* ctrl) {
+ auto& uri = ctrl->http_request().uri();
+ LOG(INFO) << "handle InjectionPointAction uri:" << uri;
+ const std::string op(http_query(uri, "op"));
+
+ if (op == "set") {
+ return handle_set(uri);
+ } else if (op == "clear") {
+ return handle_clear(uri);
+ } else if (op == "apply_suite") {
+ return handle_apply_suite(uri);
+ } else if (op == "enable") {
+ return handle_enable(uri);
+ } else if (op == "disable") {
+ return handle_disable(uri);
+ }
+
+ return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown op:" +
op);
+}
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index 7adbc8ccf12..018c30316fe 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -1498,7 +1498,7 @@ void
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "advance_last_txn failed last_txn="
<< version_pb.pending_txn_ids(0) << "
code=" << code
- << "msg=" << msg;
+ << " msg=" << msg;
return;
}
continue;
diff --git a/cloud/src/meta-service/meta_service_http.cpp
b/cloud/src/meta-service/meta_service_http.cpp
index 9a9f6de97cc..4395bb98190 100644
--- a/cloud/src/meta-service/meta_service_http.cpp
+++ b/cloud/src/meta-service/meta_service_http.cpp
@@ -170,6 +170,8 @@ static std::string_view
remove_version_prefix(std::string_view path) {
return path;
}
+HttpResponse process_injection_point(MetaServiceImpl* service,
brpc::Controller* ctrl);
+
static HttpResponse process_alter_cluster(MetaServiceImpl* service,
brpc::Controller* ctrl) {
static std::unordered_map<std::string_view,
AlterClusterRequest::Operation> operations {
{"add_cluster", AlterClusterRequest::ADD_CLUSTER},
@@ -575,11 +577,13 @@ void
MetaServiceImpl::http(::google::protobuf::RpcController* controller,
{"get_value", process_get_value},
{"show_meta_ranges", process_show_meta_ranges},
{"txn_lazy_commit", process_txn_lazy_commit},
+ {"injection_point", process_injection_point},
{"v1/decode_key", process_decode_key},
{"v1/encode_key", process_encode_key},
{"v1/get_value", process_get_value},
{"v1/show_meta_ranges", process_show_meta_ranges},
{"v1/txn_lazy_commit", process_txn_lazy_commit},
+ {"v1/injection_point", process_injection_point},
// for get
{"get_instance", process_get_instance_info},
{"get_obj_store_info", process_get_obj_store_info},
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index ddd1f8cdf79..cc333c42846 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -1894,7 +1894,7 @@ void commit_txn_eventually(
std::pair<MetaServiceCode, std::string> ret = task->wait();
if (ret.first != MetaServiceCode::OK) {
LOG(WARNING) << "txn lazy commit failed txn_id=" << txn_id << "
code=" << ret.first
- << "msg=" << ret.second;
+ << " msg=" << ret.second;
}
std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id ->
stats
diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp
b/cloud/src/meta-service/txn_lazy_committer.cpp
index fe13f7f8352..5d653c2f222 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -21,6 +21,7 @@
#include "common/logging.h"
#include "common/util.h"
+#include "cpp/sync_point.h"
#include "meta-service/keys.h"
#include "meta-service/meta_service_helper.h"
#include "meta-service/meta_service_tablet_stats.h"
@@ -189,6 +190,7 @@ void convert_tmp_rowsets(
if (code != MetaServiceCode::OK) return;
}
+ TEST_SYNC_POINT_RETURN_WITH_VOID("convert_tmp_rowsets::before_commit",
&code);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
@@ -489,7 +491,8 @@ std::pair<MetaServiceCode, std::string>
TxnLazyCommitTask::wait() {
sw.pause();
if (sw.elapsed_us() > 1000000) {
LOG(INFO) << "txn_lazy_commit task wait more than 1000ms, cost=" <<
sw.elapsed_us() / 1000
- << " ms";
+ << " ms"
+ << " txn_id=" << txn_id_;
}
return std::make_pair(this->code_, this->msg_);
}
diff --git a/common/cpp/sync_point.h b/common/cpp/sync_point.h
index f26e64fe7c3..0378918f627 100644
--- a/common/cpp/sync_point.h
+++ b/common/cpp/sync_point.h
@@ -205,7 +205,7 @@ auto try_any_cast_ret(std::vector<std::any>& any) {
// TEST_SYNC_POINT is no op in release build.
// Turn on this feature by defining the macro
-#ifndef BE_TEST
+#if !defined(BE_TEST) && !defined(ENABLE_INJECTION_POINT)
# define TEST_SYNC_POINT(x)
# define TEST_IDX_SYNC_POINT(x, index)
# define TEST_SYNC_POINT_CALLBACK(x, ...)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
index b2a9751394f..a075680e476 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
@@ -253,19 +253,15 @@ public class CloudPartition extends Partition {
LOG.debug("get version from meta service, partitions: {},
versions: {}", partitionIds, versions);
}
- if (isEmptyPartitionPruneDisabled()) {
- ArrayList<Long> news = new ArrayList<>();
- for (Long v : versions) {
- news.add(v == -1 ? 1 : v);
- }
- return news;
- }
-
if (versionUpdateTimesMs != null) {
versionUpdateTimesMs.addAll(resp.getVersionUpdateTimeMsList());
}
- return versions;
+ ArrayList<Long> news = new ArrayList<>();
+ for (Long v : versions) {
+ news.add(v == -1 ? Partition.PARTITION_INIT_VERSION : v);
+ }
+ return news;
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]