This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new c22f42121b9 [fix](compaction test) show single replica compaction 
status and fix test (#33076) (#34285) (#34438)
c22f42121b9 is described below

commit c22f42121b9d9b2db6905fdf0069c49a0d1f8936
Author: Sun Chenyang <[email protected]>
AuthorDate: Mon May 6 21:00:34 2024 +0800

    [fix](compaction test) show single replica compaction status and fix test 
(#33076) (#34285) (#34438)
---
 be/src/http/action/compaction_action.cpp  | 53 +++++++++++++++++++++++--------
 be/src/http/action/compaction_action.h    |  4 ++-
 be/src/olap/single_replica_compaction.cpp | 20 ++++++------
 be/src/olap/tablet.cpp                    | 48 ++++++++++++++++++++++------
 be/src/olap/tablet.h                      | 12 +++++++
 5 files changed, 103 insertions(+), 34 deletions(-)

diff --git a/be/src/http/action/compaction_action.cpp 
b/be/src/http/action/compaction_action.cpp
index 2b632108392..87c048529f8 100644
--- a/be/src/http/action/compaction_action.cpp
+++ b/be/src/http/action/compaction_action.cpp
@@ -41,6 +41,7 @@
 #include "olap/cumulative_compaction_time_series_policy.h"
 #include "olap/full_compaction.h"
 #include "olap/olap_define.h"
+#include "olap/single_replica_compaction.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
 #include "util/doris_metrics.h"
@@ -116,6 +117,15 @@ Status 
CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j
         return Status::NotSupported("The compaction type '{}' is not 
supported", compaction_type);
     }
 
+    // "remote" = "true" means tablet should do single replica compaction to 
fetch rowset from peer
+    bool fetch_from_remote = false;
+    std::string param_remote = req->param(PARAM_COMPACTION_REMOTE);
+    if (param_remote == "true") {
+        fetch_from_remote = true;
+    } else if (!param_remote.empty() && param_remote != "false") {
+        return Status::NotSupported("The remote = '{}' is not supported", 
param_remote);
+    }
+
     if (tablet_id == 0 && table_id != 0) {
         std::vector<TabletSharedPtr> tablet_vec =
                 StorageEngine::instance()->tablet_manager()->get_all_tablet(
@@ -133,9 +143,13 @@ Status 
CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j
             return Status::NotFound("Tablet not found. tablet_id={}", 
tablet_id);
         }
 
+        if (fetch_from_remote && !tablet->should_fetch_from_peer()) {
+            return Status::NotSupported("tablet should do compaction locally");
+        }
+
         // 3. execute compaction task
-        std::packaged_task<Status()> task([this, tablet, compaction_type]() {
-            return _execute_compaction_callback(tablet, compaction_type);
+        std::packaged_task<Status()> task([this, tablet, compaction_type, 
fetch_from_remote]() {
+            return _execute_compaction_callback(tablet, compaction_type, 
fetch_from_remote);
         });
         std::future<Status> future_obj = task.get_future();
         std::thread(std::move(task)).detach();
@@ -241,7 +255,8 @@ Status 
CompactionAction::_handle_run_status_compaction(HttpRequest* req, std::st
 }
 
 Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
-                                                      const std::string& 
compaction_type) {
+                                                      const std::string& 
compaction_type,
+                                                      bool fetch_from_remote) {
     MonotonicStopWatch timer;
     timer.start();
 
@@ -261,18 +276,29 @@ Status 
CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
             }
         }
     } else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) {
-        CumulativeCompaction cumulative_compaction(tablet);
-        res = cumulative_compaction.compact();
-        if (!res) {
-            if (res.is<CUMULATIVE_NO_SUITABLE_VERSION>()) {
-                // Ignore this error code.
-                VLOG_NOTICE << "failed to init cumulative compaction due to no 
suitable version,"
-                            << "tablet=" << tablet->tablet_id();
-            } else {
-                
DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
-                LOG(WARNING) << "failed to do cumulative compaction. res=" << 
res
+        if (fetch_from_remote) {
+            SingleReplicaCompaction single_compaction(tablet,
+                                                      
CompactionType::CUMULATIVE_COMPACTION);
+            res = single_compaction.compact();
+            if (!res) {
+                LOG(WARNING) << "failed to do single compaction. res=" << res
                              << ", table=" << tablet->tablet_id();
             }
+        } else {
+            CumulativeCompaction cumulative_compaction(tablet);
+            res = cumulative_compaction.compact();
+            if (!res) {
+                if (res.is<CUMULATIVE_NO_SUITABLE_VERSION>()) {
+                    // Ignore this error code.
+                    VLOG_NOTICE
+                            << "failed to init cumulative compaction due to no 
suitable version,"
+                            << "tablet=" << tablet->tablet_id();
+                } else {
+                    
DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
+                    LOG(WARNING) << "failed to do cumulative compaction. res=" 
<< res
+                                 << ", table=" << tablet->tablet_id();
+                }
+            }
         }
     } else if (compaction_type == PARAM_COMPACTION_FULL) {
         FullCompaction full_compaction(tablet);
@@ -288,7 +314,6 @@ Status 
CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
             }
         }
     }
-
     timer.stop();
     LOG(INFO) << "Manual compaction task finish, status=" << res
               << ", compaction_use_time=" << timer.elapsed_time() / 1000000 << 
"ms";
diff --git a/be/src/http/action/compaction_action.h 
b/be/src/http/action/compaction_action.h
index 1f0e67c6e92..111a54549ff 100644
--- a/be/src/http/action/compaction_action.h
+++ b/be/src/http/action/compaction_action.h
@@ -40,6 +40,7 @@ const std::string PARAM_COMPACTION_TYPE = "compact_type";
 const std::string PARAM_COMPACTION_BASE = "base";
 const std::string PARAM_COMPACTION_CUMULATIVE = "cumulative";
 const std::string PARAM_COMPACTION_FULL = "full";
+const std::string PARAM_COMPACTION_REMOTE = "remote";
 
 /// This action is used for viewing the compaction status.
 /// See compaction-action.md for details.
@@ -60,7 +61,8 @@ private:
     Status _handle_run_compaction(HttpRequest* req, std::string* json_result);
 
     /// thread callback function for the tablet to do compaction
-    Status _execute_compaction_callback(TabletSharedPtr tablet, const 
std::string& compaction_type);
+    Status _execute_compaction_callback(TabletSharedPtr tablet, const 
std::string& compaction_type,
+                                        bool fethch_from_remote);
 
     /// fetch compaction running status
     Status _handle_run_status_compaction(HttpRequest* req, std::string* 
json_result);
diff --git a/be/src/olap/single_replica_compaction.cpp 
b/be/src/olap/single_replica_compaction.cpp
index 793f9d497c6..f9bd6549d65 100644
--- a/be/src/olap/single_replica_compaction.cpp
+++ b/be/src/olap/single_replica_compaction.cpp
@@ -66,6 +66,9 @@ Status SingleReplicaCompaction::pick_rowsets_to_compact() {
 }
 
 Status SingleReplicaCompaction::execute_compact_impl() {
+    if (!_tablet->should_fetch_from_peer()) {
+        return Status::Aborted("compaction should be performed locally");
+    }
     std::unique_lock<std::mutex> 
lock_cumu(_tablet->get_cumulative_compaction_lock(),
                                            std::try_to_lock);
     if (!lock_cumu.owns_lock()) {
@@ -113,8 +116,7 @@ Status 
SingleReplicaCompaction::_do_single_replica_compaction_impl() {
     Version proper_version;
     // 3. find proper version to fetch
     if (!_find_rowset_to_fetch(peer_versions, &proper_version)) {
-        LOG(WARNING) << _tablet->tablet_id() << " tablet don't need to fetch, 
no matched version";
-        return Status::Aborted("no matched version to fetch");
+        return Status::Cancelled("no matched versions for single replica 
compaction");
     }
 
     // 4. fetch compaction result
@@ -131,6 +133,8 @@ Status 
SingleReplicaCompaction::_do_single_replica_compaction_impl() {
         _tablet->set_last_full_compaction_success_time(UnixMillis());
     }
 
+    _tablet->set_last_fetched_version(_output_rowset->version());
+
     int64_t current_max_version;
     {
         std::shared_lock rdlock(_tablet->get_header_lock());
@@ -164,23 +168,19 @@ Status 
SingleReplicaCompaction::_get_rowset_verisons_from_peer(
             
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr.host,
                                                                              
addr.brpc_port);
     if (stub == nullptr) {
-        LOG(WARNING) << "get rowset versions from peer: get rpc stub failed, 
host = " << addr.host
-                     << " port = " << addr.brpc_port;
-        return Status::Cancelled("get rpc stub failed");
+        return Status::Aborted("get rpc stub failed");
     }
 
     brpc::Controller cntl;
     stub->get_tablet_rowset_versions(&cntl, &request, &response, nullptr);
     if (cntl.Failed()) {
-        LOG(WARNING) << "open brpc connection to " << addr.host << " failed: " 
<< cntl.ErrorText();
-        return Status::Cancelled("open brpc connection failed");
+        return Status::Aborted("open brpc connection failed");
     }
     if (response.status().status_code() != 0) {
-        LOG(WARNING) << "peer " << addr.host << " don't have tablet " << 
_tablet->tablet_id();
-        return Status::Cancelled("peer don't have tablet");
+        return Status::Aborted("peer don't have tablet");
     }
     if (response.versions_size() == 0) {
-        return Status::Cancelled("no peer version");
+        return Status::Aborted("no peer version");
     }
     for (int i = 0; i < response.versions_size(); ++i) {
         (*peer_versions)
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 1227996503a..61a1961edf9 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1488,18 +1488,37 @@ void Tablet::get_compaction_status(std::string* 
json_result) {
                                            root.GetAllocator());
     root.AddMember("last base status", base_compaction_status_value, 
root.GetAllocator());
 
+    // last single replica compaction status
+    // "single replica compaction status": {
+    //     "remote peer": "172.100.1.0:10875",
+    //     "last failure status": "",
+    //     "last fetched rowset": "[8-10]"
+    // }
+    rapidjson::Document status;
+    status.SetObject();
     TReplicaInfo replica_info;
     std::string dummp_token;
-    rapidjson::Value fetch_addr;
     if (tablet_meta()->tablet_schema()->enable_single_replica_compaction() &&
-        _engine.get_peer_replica_info(tablet_id(), &replica_info, 
&dummp_token)) {
+        StorageEngine::instance()->get_peer_replica_info(tablet_id(), 
&replica_info,
+                                                         &dummp_token)) {
+        // remote peer
+        rapidjson::Value peer_addr;
         std::string addr = replica_info.host + ":" + 
std::to_string(replica_info.brpc_port);
-        fetch_addr.SetString(addr.c_str(), addr.length(), root.GetAllocator());
-    } else {
-        // -1 means do compaction locally
-        fetch_addr.SetString("-1", root.GetAllocator());
+        peer_addr.SetString(addr.c_str(), addr.length(), 
status.GetAllocator());
+        status.AddMember("remote peer", peer_addr, status.GetAllocator());
+        // last failure status
+        rapidjson::Value compaction_status;
+        
compaction_status.SetString(_last_single_compaction_failure_status.c_str(),
+                                    
_last_single_compaction_failure_status.length(),
+                                    status.GetAllocator());
+        status.AddMember("last failure status", compaction_status, 
status.GetAllocator());
+        // last fetched rowset
+        rapidjson::Value version;
+        std::string fetched_version = _last_fetched_version.to_string();
+        version.SetString(fetched_version.c_str(), fetched_version.length(), 
status.GetAllocator());
+        status.AddMember("last fetched rowset", version, 
status.GetAllocator());
+        root.AddMember("single replica compaction status", status, 
root.GetAllocator());
     }
-    root.AddMember("fetch from peer", fetch_addr, root.GetAllocator());
 
     // print all rowsets' version as an array
     rapidjson::Document versions_arr;
@@ -1884,13 +1903,24 @@ void 
Tablet::execute_single_replica_compaction(SingleReplicaCompaction& compacti
     Status res = compaction.execute_compact();
     if (!res.ok()) {
         set_last_failure_time(this, compaction, UnixMillis());
-        LOG(WARNING) << "failed to do single replica compaction. res=" << res
-                     << ", tablet=" << tablet_id();
+        set_last_single_compaction_failure_status(res.to_string());
+        if (res.is<CANCELLED>()) {
+            VLOG_CRITICAL << "Cannel fetching from the remote peer. res=" << 
res
+                          << ", tablet=" << tablet_id();
+        } else {
+            LOG(WARNING) << "failed to do single replica compaction. res=" << 
res
+                         << ", tablet=" << tablet_id();
+        }
         return;
     }
     set_last_failure_time(this, compaction, 0);
 }
 
+bool Tablet::should_fetch_from_peer() {
+    return tablet_meta()->tablet_schema()->enable_single_replica_compaction() 
&&
+           StorageEngine::instance()->should_fetch_from_peer(tablet_id());
+}
+
 std::vector<Version> Tablet::get_all_local_versions() {
     std::vector<Version> local_versions;
     {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index ca2fe1d70a4..7a959f7272f 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -272,6 +272,12 @@ public:
         _last_base_compaction_schedule_millis = millis;
     }
 
+    void set_last_single_compaction_failure_status(std::string status) {
+        _last_single_compaction_failure_status = std::move(status);
+    }
+
+    void set_last_fetched_version(Version version) { _last_fetched_version = 
std::move(version); }
+
     void delete_all_files();
 
     void check_tablet_path_exists();
@@ -339,6 +345,8 @@ public:
 
     std::string get_last_base_compaction_status() { return 
_last_base_compaction_status; }
 
+    bool should_fetch_from_peer();
+
     inline bool all_beta() const {
         std::shared_lock rdlock(_meta_lock);
         return _tablet_meta->all_beta();
@@ -692,6 +700,10 @@ private:
     std::atomic<int64_t> _last_checkpoint_time;
     std::string _last_base_compaction_status;
 
+    // single replica compaction status
+    std::string _last_single_compaction_failure_status;
+    Version _last_fetched_version;
+
     // cumulative compaction policy
     std::shared_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy;
     std::string_view _cumulative_compaction_type;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to