This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 78bfa52953e [cherry-pick](branch-3.0) Pick "[Enhancement](MS) Add fix 
tablet data size api for meta service (#41782)" (#43460)
78bfa52953e is described below

commit 78bfa52953ee411f65049c329dcad9b6ed6d78af
Author: abmdocrt <[email protected]>
AuthorDate: Mon Nov 11 22:45:05 2024 +0800

    [cherry-pick](branch-3.0) Pick "[Enhancement](MS) Add fix tablet data size 
api for meta service (#41782)" (#43460)
    
    Pick #41782
    
    ---------
    
    Co-authored-by: Yukang-Lian <[email protected]>
---
 be/src/cloud/cloud_full_compaction.cpp             |   4 +
 cloud/src/meta-service/meta_service.cpp            |  50 +++++
 cloud/src/meta-service/meta_service.h              |   7 +
 cloud/src/meta-service/meta_service_http.cpp       |  13 ++
 .../src/meta-service/meta_service_tablet_stats.cpp | 245 +++++++++++++++++++++
 cloud/src/meta-service/meta_service_tablet_stats.h |  15 ++
 .../test_fix_tablet_stat_fault_injection.out       |  13 ++
 .../org/apache/doris/regression/suite/Suite.groovy |  21 ++
 .../test_fix_tablet_stat_fault_injection.groovy    | 159 +++++++++++++
 9 files changed, 527 insertions(+)

diff --git a/be/src/cloud/cloud_full_compaction.cpp 
b/be/src/cloud/cloud_full_compaction.cpp
index 2e11891045c..f22c449223c 100644
--- a/be/src/cloud/cloud_full_compaction.cpp
+++ b/be/src/cloud/cloud_full_compaction.cpp
@@ -195,6 +195,10 @@ Status CloudFullCompaction::modify_rowsets() {
     compaction_job->set_num_output_rows(_output_rowset->num_rows());
     compaction_job->set_size_input_rowsets(_input_rowsets_size);
     compaction_job->set_size_output_rowsets(_output_rowset->data_disk_size());
+    
DBUG_EXECUTE_IF("CloudFullCompaction::modify_rowsets.wrong_compaction_data_size",
 {
+        compaction_job->set_size_input_rowsets(1);
+        compaction_job->set_size_output_rowsets(10000001);
+    })
     compaction_job->set_num_input_segments(_input_segments);
     compaction_job->set_num_output_segments(_output_rowset->num_segments());
     compaction_job->set_num_input_rowsets(_input_rowsets.size());
diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index 5b9e0c5462f..a59869196e3 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -2256,4 +2256,54 @@ std::pair<MetaServiceCode, std::string> 
MetaServiceImpl::get_instance_info(
     return {code, std::move(msg)};
 }
 
+std::pair<std::string, std::string> init_key_pair(std::string instance_id, 
int64_t table_id) {
+    std::string begin_key = stats_tablet_key({instance_id, table_id, 0, 0, 0});
+    std::string end_key = stats_tablet_key({instance_id, table_id + 1, 0, 0, 
0});
+    return std::make_pair(begin_key, end_key);
+}
+
+MetaServiceResponseStatus MetaServiceImpl::fix_tablet_stats(std::string 
cloud_unique_id_str,
+                                                            std::string 
table_id_str) {
+    // parse params
+    int64_t table_id;
+    std::string instance_id;
+    MetaServiceResponseStatus st = parse_fix_tablet_stats_param(
+            resource_mgr_, table_id_str, cloud_unique_id_str, table_id, 
instance_id);
+    if (st.code() != MetaServiceCode::OK) {
+        return st;
+    }
+
+    std::pair<std::string, std::string> key_pair = init_key_pair(instance_id, 
table_id);
+    std::string old_begin_key;
+    while (old_begin_key < key_pair.first) {
+        // get tablet stats
+        std::vector<std::shared_ptr<TabletStatsPB>> 
tablet_stat_shared_ptr_vec_batch;
+        old_begin_key = key_pair.first;
+
+        // fix tablet stats
+        size_t retry = 0;
+        do {
+            st = fix_tablet_stats_internal(txn_kv_, key_pair, 
tablet_stat_shared_ptr_vec_batch,
+                                           instance_id);
+            if (st.code() != MetaServiceCode::OK) {
+                LOG_WARNING("failed to fix tablet stats")
+                        .tag("err", st.msg())
+                        .tag("table id", table_id)
+                        .tag("retry time", retry);
+            }
+            retry++;
+        } while (st.code() != MetaServiceCode::OK && retry < 3);
+        if (st.code() != MetaServiceCode::OK) {
+            return st;
+        }
+
+        // Check tablet stats
+        st = check_new_tablet_stats(txn_kv_, instance_id, 
tablet_stat_shared_ptr_vec_batch);
+        if (st.code() != MetaServiceCode::OK) {
+            return st;
+        }
+    }
+    return st;
+}
+
 } // namespace doris::cloud
diff --git a/cloud/src/meta-service/meta_service.h 
b/cloud/src/meta-service/meta_service.h
index 55e8626b6bf..7af96cbc14b 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -40,6 +40,10 @@ class Transaction;
 
 constexpr std::string_view BUILT_IN_STORAGE_VAULT_NAME = 
"built_in_storage_vault";
 
+void internal_get_rowset(Transaction* txn, int64_t start, int64_t end,
+                         const std::string& instance_id, int64_t tablet_id, 
MetaServiceCode& code,
+                         std::string& msg, GetRowsetResponse* response);
+
 class MetaServiceImpl : public cloud::MetaService {
 public:
     MetaServiceImpl(std::shared_ptr<TxnKv> txn_kv, 
std::shared_ptr<ResourceManager> resource_mgr,
@@ -303,6 +307,9 @@ public:
                                                               const 
std::string& cloud_unique_id,
                                                               InstanceInfoPB* 
instance);
 
+    MetaServiceResponseStatus fix_tablet_stats(std::string cloud_unique_id_str,
+                                               std::string table_id_str);
+
 private:
     std::pair<MetaServiceCode, std::string> alter_instance(
             const AlterInstanceRequest* request,
diff --git a/cloud/src/meta-service/meta_service_http.cpp 
b/cloud/src/meta-service/meta_service_http.cpp
index 9a9f6de97cc..95ed5d614a3 100644
--- a/cloud/src/meta-service/meta_service_http.cpp
+++ b/cloud/src/meta-service/meta_service_http.cpp
@@ -468,6 +468,16 @@ static HttpResponse 
process_get_tablet_stats(MetaServiceImpl* service, brpc::Con
     return http_text_reply(resp.status(), body);
 }
 
+static HttpResponse process_fix_tablet_stats(MetaServiceImpl* service, 
brpc::Controller* ctrl) {
+    auto& uri = ctrl->http_request().uri();
+    std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id");
+    std::string_view table_id = http_query(uri, "table_id");
+
+    MetaServiceResponseStatus st =
+            service->fix_tablet_stats(std::string(cloud_unique_id), 
std::string(table_id));
+    return http_text_reply(st, st.DebugString());
+}
+
 static HttpResponse process_get_stage(MetaServiceImpl* service, 
brpc::Controller* ctrl) {
     GetStageRequest req;
     PARSE_MESSAGE_OR_RETURN(ctrl, req);
@@ -575,6 +585,7 @@ void 
MetaServiceImpl::http(::google::protobuf::RpcController* controller,
             {"get_value", process_get_value},
             {"show_meta_ranges", process_show_meta_ranges},
             {"txn_lazy_commit", process_txn_lazy_commit},
+            {"fix_tablet_stats", process_fix_tablet_stats},
             {"v1/decode_key", process_decode_key},
             {"v1/encode_key", process_encode_key},
             {"v1/get_value", process_get_value},
@@ -582,6 +593,8 @@ void 
MetaServiceImpl::http(::google::protobuf::RpcController* controller,
             {"v1/txn_lazy_commit", process_txn_lazy_commit},
             // for get
             {"get_instance", process_get_instance_info},
+            // for get
+            {"get_instance", process_get_instance_info},
             {"get_obj_store_info", process_get_obj_store_info},
             {"get_cluster", process_get_cluster},
             {"get_tablet_stats", process_get_tablet_stats},
diff --git a/cloud/src/meta-service/meta_service_tablet_stats.cpp 
b/cloud/src/meta-service/meta_service_tablet_stats.cpp
index 501cecbab76..cecccbd6767 100644
--- a/cloud/src/meta-service/meta_service_tablet_stats.cpp
+++ b/cloud/src/meta-service/meta_service_tablet_stats.cpp
@@ -17,13 +17,22 @@
 
 #include "meta-service/meta_service_tablet_stats.h"
 
+#include <fmt/core.h>
 #include <fmt/format.h>
+#include <gen_cpp/cloud.pb.h>
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <string_view>
 
 #include "common/logging.h"
 #include "common/util.h"
 #include "meta-service/keys.h"
+#include "meta-service/meta_service.h"
 #include "meta-service/meta_service_helper.h"
 #include "meta-service/txn_kv.h"
+#include "meta-service/txn_kv_error.h"
 
 namespace doris::cloud {
 
@@ -156,4 +165,240 @@ void internal_get_tablet_stats(MetaServiceCode& code, 
std::string& msg, Transact
     merge_tablet_stats(stats, detached_stats);
 }
 
+MetaServiceResponseStatus parse_fix_tablet_stats_param(
+        std::shared_ptr<ResourceManager> resource_mgr, const std::string& 
table_id_str,
+        const std::string& cloud_unique_id_str, int64_t& table_id, 
std::string& instance_id) {
+    MetaServiceCode code = MetaServiceCode::OK;
+    std::string msg;
+    MetaServiceResponseStatus st;
+    st.set_code(MetaServiceCode::OK);
+
+    // parse params
+    try {
+        table_id = std::stoll(table_id_str);
+    } catch (...) {
+        st.set_code(MetaServiceCode::INVALID_ARGUMENT);
+        st.set_msg("Invalid table_id, table_id: " + table_id_str);
+        return st;
+    }
+
+    instance_id = get_instance_id(resource_mgr, cloud_unique_id_str);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "empty instance_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id_str;
+        st.set_code(code);
+        st.set_msg(msg);
+        return st;
+    }
+    return st;
+}
+
+MetaServiceResponseStatus fix_tablet_stats_internal(
+        std::shared_ptr<TxnKv> txn_kv, std::pair<std::string, std::string>& 
key_pair,
+        std::vector<std::shared_ptr<TabletStatsPB>>& 
tablet_stat_shared_ptr_vec_batch,
+        const std::string& instance_id, size_t batch_size) {
+    std::unique_ptr<Transaction> txn;
+    MetaServiceResponseStatus st;
+    st.set_code(MetaServiceCode::OK);
+    MetaServiceCode code = MetaServiceCode::OK;
+    std::unique_ptr<RangeGetIterator> it;
+    std::vector<std::shared_ptr<TabletStatsPB>> tmp_tablet_stat_vec;
+
+    TxnErrorCode err = txn_kv->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        st.set_code(cast_as<ErrCategory::CREATE>(err));
+        st.set_msg("failed to create txn");
+        return st;
+    }
+
+    // read tablet stats
+    err = txn->get(key_pair.first, key_pair.second, &it, true);
+    if (err != TxnErrorCode::TXN_OK) {
+        st.set_code(cast_as<ErrCategory::READ>(err));
+        st.set_msg(fmt::format("failed to get tablet stats, err={} ", err));
+        return st;
+    }
+
+    size_t tablet_cnt = 0;
+    while (it->has_next() && tablet_cnt < batch_size) {
+        auto [k, v] = it->next();
+        key_pair.first = k;
+        auto k1 = k;
+        k1.remove_prefix(1);
+        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
+        decode_key(&k1, &out);
+
+        // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} 
${partition_id} ${tablet_id} -> TabletStatsPB
+        if (out.size() == 7) {
+            tablet_cnt++;
+            TabletStatsPB tablet_stat;
+            tablet_stat.ParseFromArray(v.data(), v.size());
+            
tmp_tablet_stat_vec.emplace_back(std::make_shared<TabletStatsPB>(tablet_stat));
+        }
+    }
+    if (it->has_next()) {
+        key_pair.first = it->next().first;
+    }
+
+    for (const auto& tablet_stat_ptr : tmp_tablet_stat_vec) {
+        GetRowsetResponse resp;
+        std::string msg;
+        // get rowsets in tablet and accumulate disk size
+        internal_get_rowset(txn.get(), 0, std::numeric_limits<int64_t>::max() 
- 1, instance_id,
+                            tablet_stat_ptr->idx().tablet_id(), code, msg, 
&resp);
+        if (code != MetaServiceCode::OK) {
+            st.set_code(code);
+            st.set_msg(msg);
+            return st;
+        }
+        int64_t total_disk_size = 0;
+        for (const auto& rs_meta : resp.rowset_meta()) {
+            total_disk_size += rs_meta.total_disk_size();
+        }
+
+        // set new disk size to tabletPB and write it back
+        TabletStatsPB tablet_stat;
+        tablet_stat.CopyFrom(*tablet_stat_ptr);
+        tablet_stat.set_data_size(total_disk_size);
+        // record tablet stats batch
+        
tablet_stat_shared_ptr_vec_batch.emplace_back(std::make_shared<TabletStatsPB>(tablet_stat));
+        std::string tablet_stat_key;
+        std::string tablet_stat_value;
+        tablet_stat_key = stats_tablet_key(
+                {instance_id, tablet_stat.idx().table_id(), 
tablet_stat.idx().index_id(),
+                 tablet_stat.idx().partition_id(), 
tablet_stat.idx().tablet_id()});
+        if (!tablet_stat.SerializeToString(&tablet_stat_value)) {
+            st.set_code(MetaServiceCode::PROTOBUF_SERIALIZE_ERR);
+            st.set_msg("failed to serialize tablet stat");
+            return st;
+        }
+        txn->put(tablet_stat_key, tablet_stat_value);
+
+        // read num segs
+        // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} 
${partition_id} ${tablet_id} "num_segs" -> int64
+        std::string tablet_stat_num_segs_key;
+        stats_tablet_num_segs_key(
+                {instance_id, tablet_stat_ptr->idx().table_id(), 
tablet_stat_ptr->idx().index_id(),
+                 tablet_stat_ptr->idx().partition_id(), 
tablet_stat_ptr->idx().tablet_id()},
+                &tablet_stat_num_segs_key);
+        int64_t tablet_stat_num_segs = 0;
+        std::string tablet_stat_num_segs_value(sizeof(tablet_stat_num_segs), 
'\0');
+        err = txn->get(tablet_stat_num_segs_key, &tablet_stat_num_segs_value);
+        if (err != TxnErrorCode::TXN_OK && err != 
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            st.set_code(cast_as<ErrCategory::READ>(err));
+        }
+        if (tablet_stat_num_segs_value.size() != sizeof(tablet_stat_num_segs)) 
[[unlikely]] {
+            LOG(WARNING) << " malformed tablet stats value v.size="
+                         << tablet_stat_num_segs_value.size()
+                         << " value=" << hex(tablet_stat_num_segs_value);
+        }
+        std::memcpy(&tablet_stat_num_segs, tablet_stat_num_segs_value.data(),
+                    sizeof(tablet_stat_num_segs));
+        if constexpr (std::endian::native == std::endian::big) {
+            tablet_stat_num_segs = bswap_64(tablet_stat_num_segs);
+        }
+
+        if (tablet_stat_num_segs > 0) {
+            // set tablet stats data size = 0
+            // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} 
${partition_id} ${tablet_id} "data_size" -> int64
+            std::string tablet_stat_data_size_key;
+            stats_tablet_data_size_key(
+                    {instance_id, tablet_stat.idx().table_id(), 
tablet_stat.idx().index_id(),
+                     tablet_stat.idx().partition_id(), 
tablet_stat.idx().tablet_id()},
+                    &tablet_stat_data_size_key);
+            int64_t tablet_stat_data_size = 0;
+            std::string 
tablet_stat_data_size_value(sizeof(tablet_stat_data_size), '\0');
+            memcpy(tablet_stat_data_size_value.data(), &tablet_stat_data_size,
+                   sizeof(tablet_stat_data_size));
+            txn->put(tablet_stat_data_size_key, tablet_stat_data_size_value);
+        }
+    }
+
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        st.set_code(cast_as<ErrCategory::COMMIT>(err));
+        st.set_msg("failed to commit txn");
+        return st;
+    }
+    return st;
+}
+
+MetaServiceResponseStatus check_new_tablet_stats(
+        std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id,
+        const std::vector<std::shared_ptr<TabletStatsPB>>& 
tablet_stat_shared_ptr_vec_batch) {
+    std::unique_ptr<Transaction> txn;
+    MetaServiceResponseStatus st;
+    st.set_code(MetaServiceCode::OK);
+
+    TxnErrorCode err = txn_kv->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        st.set_code(cast_as<ErrCategory::CREATE>(err));
+        st.set_msg("failed to create txn");
+        return st;
+    }
+
+    for (const auto& tablet_stat_ptr : tablet_stat_shared_ptr_vec_batch) {
+        // check tablet stats
+        std::string tablet_stat_key;
+        std::string tablet_stat_value;
+        tablet_stat_key = stats_tablet_key(
+                {instance_id, tablet_stat_ptr->idx().table_id(), 
tablet_stat_ptr->idx().index_id(),
+                 tablet_stat_ptr->idx().partition_id(), 
tablet_stat_ptr->idx().tablet_id()});
+        err = txn->get(tablet_stat_key, &tablet_stat_value);
+        if (err != TxnErrorCode::TXN_OK && err != 
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            st.set_code(cast_as<ErrCategory::READ>(err));
+            return st;
+        }
+        TabletStatsPB tablet_stat_check;
+        tablet_stat_check.ParseFromArray(tablet_stat_value.data(), 
tablet_stat_value.size());
+        if (tablet_stat_check.DebugString() != tablet_stat_ptr->DebugString() 
&&
+            // If anyone data size of tablet_stat_check and tablet_stat_ptr is 
twice bigger than another,
+            // we need to rewrite it this tablet_stat.
+            (tablet_stat_check.data_size() > 2 * tablet_stat_ptr->data_size() 
||
+             tablet_stat_ptr->data_size() > 2 * 
tablet_stat_check.data_size())) {
+            LOG_WARNING("[fix tablet stats]:tablet stats check failed")
+                    .tag("tablet stat", tablet_stat_ptr->DebugString())
+                    .tag("check tabelt stat", tablet_stat_check.DebugString());
+        }
+
+        // check data size
+        std::string tablet_stat_data_size_key;
+        stats_tablet_data_size_key(
+                {instance_id, tablet_stat_ptr->idx().table_id(), 
tablet_stat_ptr->idx().index_id(),
+                 tablet_stat_ptr->idx().partition_id(), 
tablet_stat_ptr->idx().tablet_id()},
+                &tablet_stat_data_size_key);
+        int64_t tablet_stat_data_size = 0;
+        std::string tablet_stat_data_size_value(sizeof(tablet_stat_data_size), 
'\0');
+        err = txn->get(tablet_stat_data_size_key, 
&tablet_stat_data_size_value);
+        if (err != TxnErrorCode::TXN_OK && err != 
TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            st.set_code(cast_as<ErrCategory::READ>(err));
+            return st;
+        }
+        int64_t tablet_stat_data_size_check;
+
+        if (tablet_stat_data_size_value.size() != 
sizeof(tablet_stat_data_size_check))
+                [[unlikely]] {
+            LOG(WARNING) << " malformed tablet stats value v.size="
+                         << tablet_stat_data_size_value.size()
+                         << " value=" << hex(tablet_stat_data_size_value);
+        }
+        std::memcpy(&tablet_stat_data_size_check, 
tablet_stat_data_size_value.data(),
+                    sizeof(tablet_stat_data_size_check));
+        if constexpr (std::endian::native == std::endian::big) {
+            tablet_stat_data_size_check = 
bswap_64(tablet_stat_data_size_check);
+        }
+        if (tablet_stat_data_size_check != tablet_stat_data_size &&
+            // ditto
+            (tablet_stat_data_size_check > 2 * tablet_stat_data_size ||
+             tablet_stat_data_size > 2 * tablet_stat_data_size_check)) {
+            LOG_WARNING("[fix tablet stats]:data size check failed")
+                    .tag("data size", tablet_stat_data_size)
+                    .tag("check data size", tablet_stat_data_size_check);
+        }
+    }
+
+    return st;
+}
+
 } // namespace doris::cloud
diff --git a/cloud/src/meta-service/meta_service_tablet_stats.h 
b/cloud/src/meta-service/meta_service_tablet_stats.h
index 5726cf50b76..a7aea5885a8 100644
--- a/cloud/src/meta-service/meta_service_tablet_stats.h
+++ b/cloud/src/meta-service/meta_service_tablet_stats.h
@@ -19,6 +19,8 @@
 
 #include <gen_cpp/cloud.pb.h>
 
+#include "resource-manager/resource_manager.h"
+
 namespace doris::cloud {
 class Transaction;
 class RangeGetIterator;
@@ -66,4 +68,17 @@ void internal_get_tablet_stats(MetaServiceCode& code, 
std::string& msg, Transact
                                             TabletStats& detached_stats);
 // clang-format on
 
+MetaServiceResponseStatus parse_fix_tablet_stats_param(
+        std::shared_ptr<ResourceManager> resource_mgr, const std::string& 
table_id_str,
+        const std::string& cloud_unique_id_str, int64_t& table_id, 
std::string& instance_id);
+
+MetaServiceResponseStatus fix_tablet_stats_internal(
+        std::shared_ptr<TxnKv> txn_kv, std::pair<std::string, std::string>& 
key_pair,
+        std::vector<std::shared_ptr<TabletStatsPB>>& 
tablet_stat_shared_ptr_vec_batch,
+        const std::string& instance_id, size_t batch_size = 20);
+
+MetaServiceResponseStatus check_new_tablet_stats(
+        std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id,
+        const std::vector<std::shared_ptr<TabletStatsPB>>& 
tablet_stat_shared_ptr_vec_batch);
+
 } // namespace doris::cloud
diff --git 
a/regression-test/data/fault_injection_p0/test_fix_tablet_stat_fault_injection.out
 
b/regression-test/data/fault_injection_p0/test_fix_tablet_stat_fault_injection.out
new file mode 100644
index 00000000000..a9db9fa716e
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/test_fix_tablet_stat_fault_injection.out
@@ -0,0 +1,13 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_1 --
+test_fix_tablet_stat_fault_injection   test_fix_tablet_stat_fault_injection    
518.911 KB      1000    500     0.000 
+       Total   518.911 KB      1000            0.000 
+
+-- !select_2 --
+test_fix_tablet_stat_fault_injection   test_fix_tablet_stat_fault_injection    
9.314 GB        1000    100     0.000 
+       Total   9.314 GB        1000            0.000 
+
+-- !select_3 --
+test_fix_tablet_stat_fault_injection   test_fix_tablet_stat_fault_injection    
114.974 KB      1000    100     0.000 
+       Total   114.974 KB      1000            0.000 
+
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 2802e51f45f..5e56f593fb5 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -2460,6 +2460,27 @@ class Suite implements GroovyInterceptable {
         }
     }
 
+    def fix_tablet_stats = { table_id ->
+        def jsonOutput = new JsonOutput()
+        def map = []
+        def js = jsonOutput.toJson(map)
+        log.info("fix tablet stat req: 
/MetaService/http/fix_tablet_stats?token=${token}&cloud_unique_id=${instance_id}&table_id=${table_id}
 ".toString())
+
+        def fix_tablet_stats_api = { request_body, check_func ->
+            httpTest {
+                endpoint context.config.metaServiceHttpAddress
+                uri 
"/MetaService/http/fix_tablet_stats?token=${token}&cloud_unique_id=${instance_id}&table_id=${table_id}"
+                body request_body
+                check check_func
+            }
+        }
+
+        fix_tablet_stats_api.call(js) {
+            respCode, body ->
+                log.info("fix tablet stats resp: ${body} 
${respCode}".toString())
+        }
+    }
+
     public void resetConnection() {
         context.resetConnection()
     }
diff --git 
a/regression-test/suites/fault_injection_p0/test_fix_tablet_stat_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_fix_tablet_stat_fault_injection.groovy
new file mode 100644
index 00000000000..d96f6f0ec48
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_fix_tablet_stat_fault_injection.groovy
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_fix_tablet_stat_fault_injection", "nonConcurrent") {
+    if(isCloudMode()){
+        def tableName = "test_fix_tablet_stat_fault_injection"
+        def bucketSize = 10
+        def partitionSize = 100
+        def maxPartition = partitionSize + 1
+        def create_table_sql = """
+                    CREATE TABLE IF NOT EXISTS ${tableName}
+                        (
+                        `k1` INT NULL,
+                        `v1` INT NULL,
+                        `v2` INT NULL
+                        )
+                        UNIQUE KEY (k1)
+                        PARTITION BY RANGE(`k1`)
+                        (
+                            FROM (1) TO (${maxPartition}) INTERVAL 1
+                        )
+                        DISTRIBUTED BY HASH(`k1`) BUCKETS ${bucketSize}
+                        PROPERTIES (
+                        "replication_num" = "1",
+                        "disable_auto_compaction" = "true"
+                        );
+                """
+        def insertData = {
+                def backendId_to_backendIP = [:]
+                def backendId_to_backendHttpPort = [:]
+                getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+            try {
+                // enable debug point
+                
GetDebugPoint().enableDebugPointForAllBEs("CloudFullCompaction::modify_rowsets.wrong_compaction_data_size")
+                // insert data
+                sql """ DROP TABLE IF EXISTS ${tableName} """
+
+                sql "${create_table_sql}"
+                (1..partitionSize).each { i ->
+                    sql "insert into ${tableName} values (${i},1,1);"
+                    sql "insert into ${tableName} values (${i},2,2);"
+                    sql "insert into ${tableName} values (${i},3,3);"
+                    sql "insert into ${tableName} values (${i},4,4);"
+                    sql "insert into ${tableName} values (${i},5,5);"
+                }
+
+                sql "select count(*) from ${tableName};"
+                sleep(60000)
+                qt_select_1 "show data from ${tableName};"
+
+                // check rowsets num
+                def tablets = sql_return_maparray """ show tablets from 
${tableName}; """
+                // before full compaction, there are 6 rowsets.
+                int rowsetCount = 0
+                for (def tablet in tablets) {
+                    String tablet_id = tablet.TabletId
+                    (code, out, err) = curl("GET", tablet.CompactionStatus)
+                    logger.info("Show tablets status after insert data: code=" 
+ code + ", out=" + out + ", err=" + err)
+                    assertEquals(code, 0)
+                    def tabletJson = parseJson(out.trim())
+                    assert tabletJson.rowsets instanceof List
+                    rowsetCount +=((List<String>) tabletJson.rowsets).size()
+                }
+                assert (rowsetCount == 6 * bucketSize * partitionSize) 
+
+                // trigger full compactions for all tablets in ${tableName}
+                for (def tablet in tablets) {
+                    String tablet_id = tablet.TabletId
+                    backend_id = tablet.BackendId
+                    times = 1
+
+                    do{
+                        (code, out, err) = 
be_run_full_compaction(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+                        logger.info("Run compaction: code=" + code + ", out=" 
+ out + ", err=" + err)
+                        ++times
+                    } while 
(parseJson(out.trim()).status.toLowerCase()!="success" && times<=10)
+
+                    def compactJson = parseJson(out.trim())
+                    assertEquals("success", compactJson.status.toLowerCase())
+                }
+
+                // wait for full compaction done
+                for (def tablet in tablets) {
+                    boolean running = true
+                    do {
+                        String tablet_id = tablet.TabletId
+                        backend_id = tablet.BackendId
+                        (code, out, err) = 
be_get_compaction_status(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+                        logger.info("Get compaction status: code=" + code + ", 
out=" + out + ", err=" + err)
+                        assertEquals(code, 0)
+                        def compactionStatus = parseJson(out.trim())
+                        assertEquals("success", 
compactionStatus.status.toLowerCase())
+                        running = compactionStatus.run_status
+                    } while (running)
+                }
+
+                sleep(60000)
+                // after full compaction, there are 2 rowsets.
+                rowsetCount = 0
+                for (def tablet in tablets) {
+                    String tablet_id = tablet.TabletId
+                    (code, out, err) = curl("GET", tablet.CompactionStatus)
+                    logger.info("Show tablets status after full compaction: 
code=" + code + ", out=" + out + ", err=" + err)
+                    assertEquals(code, 0)
+                    def tabletJson = parseJson(out.trim())
+                    assert tabletJson.rowsets instanceof List
+                    rowsetCount +=((List<String>) tabletJson.rowsets).size()
+                }
+                // assert (rowsetCount == 2 * bucketSize * partitionSize)
+
+                // data size should be very large
+                sql "select count(*) from ${tableName};"
+                qt_select_2 "show data from ${tableName};"
+
+
+                fix_tablet_stats(getTableId(tableName))
+
+                sleep(60000)
+                // after fix, there are 2 rowsets.
+                rowsetCount = 0
+                for (def tablet in tablets) {
+                    String tablet_id = tablet.TabletId
+                    (code, out, err) = curl("GET", tablet.CompactionStatus)
+                    //logger.info("Show tablets status after fix stats: code=" 
+ code + ", out=" + out + ", err=" + err)
+                    assertEquals(code, 0)
+                    def tabletJson = parseJson(out.trim())
+                    assert tabletJson.rowsets instanceof List
+                    rowsetCount +=((List<String>) tabletJson.rowsets).size()
+                }
+                // assert (rowsetCount == 2 * bucketSize * partitionSize)
+                // after fix table stats, data size should be normal
+                sql "select count(*) from ${tableName};"
+                qt_select_3 "show data from ${tableName};"
+            } finally {
+                //try_sql("DROP TABLE IF EXISTS ${tableName}")
+                
GetDebugPoint().disableDebugPointForAllBEs("CloudFullCompaction::modify_rowsets.wrong_compaction_data_size")
+            }
+        }
+        insertData()
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to