This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new c22f42121b9 [fix](compaction test) show single replica compaction
status and fix test (#33076) (#34285) (#34438)
c22f42121b9 is described below
commit c22f42121b9d9b2db6905fdf0069c49a0d1f8936
Author: Sun Chenyang <[email protected]>
AuthorDate: Mon May 6 21:00:34 2024 +0800
[fix](compaction test) show single replica compaction status and fix test
(#33076) (#34285) (#34438)
---
be/src/http/action/compaction_action.cpp | 53 +++++++++++++++++++++++--------
be/src/http/action/compaction_action.h | 4 ++-
be/src/olap/single_replica_compaction.cpp | 20 ++++++------
be/src/olap/tablet.cpp | 48 ++++++++++++++++++++++------
be/src/olap/tablet.h | 12 +++++++
5 files changed, 103 insertions(+), 34 deletions(-)
diff --git a/be/src/http/action/compaction_action.cpp
b/be/src/http/action/compaction_action.cpp
index 2b632108392..87c048529f8 100644
--- a/be/src/http/action/compaction_action.cpp
+++ b/be/src/http/action/compaction_action.cpp
@@ -41,6 +41,7 @@
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/full_compaction.h"
#include "olap/olap_define.h"
+#include "olap/single_replica_compaction.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "util/doris_metrics.h"
@@ -116,6 +117,15 @@ Status
CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j
return Status::NotSupported("The compaction type '{}' is not
supported", compaction_type);
}
+ // "remote" = "true" means tablet should do single replica compaction to
fetch rowset from peer
+ bool fetch_from_remote = false;
+ std::string param_remote = req->param(PARAM_COMPACTION_REMOTE);
+ if (param_remote == "true") {
+ fetch_from_remote = true;
+ } else if (!param_remote.empty() && param_remote != "false") {
+ return Status::NotSupported("The remote = '{}' is not supported",
param_remote);
+ }
+
if (tablet_id == 0 && table_id != 0) {
std::vector<TabletSharedPtr> tablet_vec =
StorageEngine::instance()->tablet_manager()->get_all_tablet(
@@ -133,9 +143,13 @@ Status
CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j
return Status::NotFound("Tablet not found. tablet_id={}",
tablet_id);
}
+ if (fetch_from_remote && !tablet->should_fetch_from_peer()) {
+ return Status::NotSupported("tablet should do compaction locally");
+ }
+
// 3. execute compaction task
- std::packaged_task<Status()> task([this, tablet, compaction_type]() {
- return _execute_compaction_callback(tablet, compaction_type);
+ std::packaged_task<Status()> task([this, tablet, compaction_type,
fetch_from_remote]() {
+ return _execute_compaction_callback(tablet, compaction_type,
fetch_from_remote);
});
std::future<Status> future_obj = task.get_future();
std::thread(std::move(task)).detach();
@@ -241,7 +255,8 @@ Status
CompactionAction::_handle_run_status_compaction(HttpRequest* req, std::st
}
Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
- const std::string&
compaction_type) {
+ const std::string&
compaction_type,
+ bool fetch_from_remote) {
MonotonicStopWatch timer;
timer.start();
@@ -261,18 +276,29 @@ Status
CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
}
}
} else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) {
- CumulativeCompaction cumulative_compaction(tablet);
- res = cumulative_compaction.compact();
- if (!res) {
- if (res.is<CUMULATIVE_NO_SUITABLE_VERSION>()) {
- // Ignore this error code.
- VLOG_NOTICE << "failed to init cumulative compaction due to no
suitable version,"
- << "tablet=" << tablet->tablet_id();
- } else {
-
DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
- LOG(WARNING) << "failed to do cumulative compaction. res=" <<
res
+ if (fetch_from_remote) {
+ SingleReplicaCompaction single_compaction(tablet,
+
CompactionType::CUMULATIVE_COMPACTION);
+ res = single_compaction.compact();
+ if (!res) {
+ LOG(WARNING) << "failed to do single compaction. res=" << res
<< ", table=" << tablet->tablet_id();
}
+ } else {
+ CumulativeCompaction cumulative_compaction(tablet);
+ res = cumulative_compaction.compact();
+ if (!res) {
+ if (res.is<CUMULATIVE_NO_SUITABLE_VERSION>()) {
+ // Ignore this error code.
+ VLOG_NOTICE
+ << "failed to init cumulative compaction due to no
suitable version,"
+ << "tablet=" << tablet->tablet_id();
+ } else {
+
DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
+ LOG(WARNING) << "failed to do cumulative compaction. res="
<< res
+ << ", table=" << tablet->tablet_id();
+ }
+ }
}
} else if (compaction_type == PARAM_COMPACTION_FULL) {
FullCompaction full_compaction(tablet);
@@ -288,7 +314,6 @@ Status
CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
}
}
}
-
timer.stop();
LOG(INFO) << "Manual compaction task finish, status=" << res
<< ", compaction_use_time=" << timer.elapsed_time() / 1000000 <<
"ms";
diff --git a/be/src/http/action/compaction_action.h
b/be/src/http/action/compaction_action.h
index 1f0e67c6e92..111a54549ff 100644
--- a/be/src/http/action/compaction_action.h
+++ b/be/src/http/action/compaction_action.h
@@ -40,6 +40,7 @@ const std::string PARAM_COMPACTION_TYPE = "compact_type";
const std::string PARAM_COMPACTION_BASE = "base";
const std::string PARAM_COMPACTION_CUMULATIVE = "cumulative";
const std::string PARAM_COMPACTION_FULL = "full";
+const std::string PARAM_COMPACTION_REMOTE = "remote";
/// This action is used for viewing the compaction status.
/// See compaction-action.md for details.
@@ -60,7 +61,8 @@ private:
Status _handle_run_compaction(HttpRequest* req, std::string* json_result);
/// thread callback function for the tablet to do compaction
- Status _execute_compaction_callback(TabletSharedPtr tablet, const
std::string& compaction_type);
+ Status _execute_compaction_callback(TabletSharedPtr tablet, const
std::string& compaction_type,
+ bool fethch_from_remote);
/// fetch compaction running status
Status _handle_run_status_compaction(HttpRequest* req, std::string*
json_result);
diff --git a/be/src/olap/single_replica_compaction.cpp
b/be/src/olap/single_replica_compaction.cpp
index 793f9d497c6..f9bd6549d65 100644
--- a/be/src/olap/single_replica_compaction.cpp
+++ b/be/src/olap/single_replica_compaction.cpp
@@ -66,6 +66,9 @@ Status SingleReplicaCompaction::pick_rowsets_to_compact() {
}
Status SingleReplicaCompaction::execute_compact_impl() {
+ if (!_tablet->should_fetch_from_peer()) {
+ return Status::Aborted("compaction should be performed locally");
+ }
std::unique_lock<std::mutex>
lock_cumu(_tablet->get_cumulative_compaction_lock(),
std::try_to_lock);
if (!lock_cumu.owns_lock()) {
@@ -113,8 +116,7 @@ Status
SingleReplicaCompaction::_do_single_replica_compaction_impl() {
Version proper_version;
// 3. find proper version to fetch
if (!_find_rowset_to_fetch(peer_versions, &proper_version)) {
- LOG(WARNING) << _tablet->tablet_id() << " tablet don't need to fetch,
no matched version";
- return Status::Aborted("no matched version to fetch");
+ return Status::Cancelled("no matched versions for single replica
compaction");
}
// 4. fetch compaction result
@@ -131,6 +133,8 @@ Status
SingleReplicaCompaction::_do_single_replica_compaction_impl() {
_tablet->set_last_full_compaction_success_time(UnixMillis());
}
+ _tablet->set_last_fetched_version(_output_rowset->version());
+
int64_t current_max_version;
{
std::shared_lock rdlock(_tablet->get_header_lock());
@@ -164,23 +168,19 @@ Status
SingleReplicaCompaction::_get_rowset_verisons_from_peer(
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr.host,
addr.brpc_port);
if (stub == nullptr) {
- LOG(WARNING) << "get rowset versions from peer: get rpc stub failed,
host = " << addr.host
- << " port = " << addr.brpc_port;
- return Status::Cancelled("get rpc stub failed");
+ return Status::Aborted("get rpc stub failed");
}
brpc::Controller cntl;
stub->get_tablet_rowset_versions(&cntl, &request, &response, nullptr);
if (cntl.Failed()) {
- LOG(WARNING) << "open brpc connection to " << addr.host << " failed: "
<< cntl.ErrorText();
- return Status::Cancelled("open brpc connection failed");
+ return Status::Aborted("open brpc connection failed");
}
if (response.status().status_code() != 0) {
- LOG(WARNING) << "peer " << addr.host << " don't have tablet " <<
_tablet->tablet_id();
- return Status::Cancelled("peer don't have tablet");
+ return Status::Aborted("peer don't have tablet");
}
if (response.versions_size() == 0) {
- return Status::Cancelled("no peer version");
+ return Status::Aborted("no peer version");
}
for (int i = 0; i < response.versions_size(); ++i) {
(*peer_versions)
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 1227996503a..61a1961edf9 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1488,18 +1488,37 @@ void Tablet::get_compaction_status(std::string*
json_result) {
root.GetAllocator());
root.AddMember("last base status", base_compaction_status_value,
root.GetAllocator());
+ // last single replica compaction status
+ // "single replica compaction status": {
+ // "remote peer": "172.100.1.0:10875",
+ // "last failure status": "",
+ // "last fetched rowset": "[8-10]"
+ // }
+ rapidjson::Document status;
+ status.SetObject();
TReplicaInfo replica_info;
std::string dummp_token;
- rapidjson::Value fetch_addr;
if (tablet_meta()->tablet_schema()->enable_single_replica_compaction() &&
- _engine.get_peer_replica_info(tablet_id(), &replica_info,
&dummp_token)) {
+ StorageEngine::instance()->get_peer_replica_info(tablet_id(),
&replica_info,
+ &dummp_token)) {
+ // remote peer
+ rapidjson::Value peer_addr;
std::string addr = replica_info.host + ":" +
std::to_string(replica_info.brpc_port);
- fetch_addr.SetString(addr.c_str(), addr.length(), root.GetAllocator());
- } else {
- // -1 means do compaction locally
- fetch_addr.SetString("-1", root.GetAllocator());
+ peer_addr.SetString(addr.c_str(), addr.length(),
status.GetAllocator());
+ status.AddMember("remote peer", peer_addr, status.GetAllocator());
+ // last failure status
+ rapidjson::Value compaction_status;
+
compaction_status.SetString(_last_single_compaction_failure_status.c_str(),
+
_last_single_compaction_failure_status.length(),
+ status.GetAllocator());
+ status.AddMember("last failure status", compaction_status,
status.GetAllocator());
+ // last fetched rowset
+ rapidjson::Value version;
+ std::string fetched_version = _last_fetched_version.to_string();
+ version.SetString(fetched_version.c_str(), fetched_version.length(),
status.GetAllocator());
+ status.AddMember("last fetched rowset", version,
status.GetAllocator());
+ root.AddMember("single replica compaction status", status,
root.GetAllocator());
}
- root.AddMember("fetch from peer", fetch_addr, root.GetAllocator());
// print all rowsets' version as an array
rapidjson::Document versions_arr;
@@ -1884,13 +1903,24 @@ void
Tablet::execute_single_replica_compaction(SingleReplicaCompaction& compacti
Status res = compaction.execute_compact();
if (!res.ok()) {
set_last_failure_time(this, compaction, UnixMillis());
- LOG(WARNING) << "failed to do single replica compaction. res=" << res
- << ", tablet=" << tablet_id();
+ set_last_single_compaction_failure_status(res.to_string());
+ if (res.is<CANCELLED>()) {
+ VLOG_CRITICAL << "Cannel fetching from the remote peer. res=" <<
res
+ << ", tablet=" << tablet_id();
+ } else {
+ LOG(WARNING) << "failed to do single replica compaction. res=" <<
res
+ << ", tablet=" << tablet_id();
+ }
return;
}
set_last_failure_time(this, compaction, 0);
}
+bool Tablet::should_fetch_from_peer() {
+ return tablet_meta()->tablet_schema()->enable_single_replica_compaction()
&&
+ StorageEngine::instance()->should_fetch_from_peer(tablet_id());
+}
+
std::vector<Version> Tablet::get_all_local_versions() {
std::vector<Version> local_versions;
{
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index ca2fe1d70a4..7a959f7272f 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -272,6 +272,12 @@ public:
_last_base_compaction_schedule_millis = millis;
}
+ void set_last_single_compaction_failure_status(std::string status) {
+ _last_single_compaction_failure_status = std::move(status);
+ }
+
+ void set_last_fetched_version(Version version) { _last_fetched_version =
std::move(version); }
+
void delete_all_files();
void check_tablet_path_exists();
@@ -339,6 +345,8 @@ public:
std::string get_last_base_compaction_status() { return
_last_base_compaction_status; }
+ bool should_fetch_from_peer();
+
inline bool all_beta() const {
std::shared_lock rdlock(_meta_lock);
return _tablet_meta->all_beta();
@@ -692,6 +700,10 @@ private:
std::atomic<int64_t> _last_checkpoint_time;
std::string _last_base_compaction_status;
+ // single replica compaction status
+ std::string _last_single_compaction_failure_status;
+ Version _last_fetched_version;
+
// cumulative compaction policy
std::shared_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy;
std::string_view _cumulative_compaction_type;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]