This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 007ad37efd8 [feat](cloud) process_compaction_job supports versioned 
read and save min read version (#55079)
007ad37efd8 is described below

commit 007ad37efd818e22dd10467162ebe7886f237ab7
Author: walter <[email protected]>
AuthorDate: Mon Aug 25 16:09:12 2025 +0800

    [feat](cloud) process_compaction_job supports versioned read and save min 
read version (#55079)
---
 cloud/src/meta-service/meta_service_job.cpp | 196 +++++++++++++++----------
 cloud/test/meta_service_job_test.cpp        | 217 +++++++++++++++++++++++++++-
 2 files changed, 339 insertions(+), 74 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_job.cpp 
b/cloud/src/meta-service/meta_service_job.cpp
index 6ac1ed02d2d..12c45a6759f 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -784,6 +784,46 @@ int compaction_update_tablet_stats(const 
TabletCompactionJobPB& compaction, Tabl
     return 0;
 }
 
+std::pair<MetaServiceCode, std::string> scan_compaction_input_rowsets(
+        Transaction* txn, std::string_view instance_id, int64_t tablet_id, 
std::string& rs_start,
+        std::string& rs_end, int& num_rowsets, auto&& callback) {
+    std::unique_ptr<RangeGetIterator> it;
+    DORIS_CLOUD_DEFER {
+        INSTANCE_LOG(INFO) << "get rowset meta, num_rowsets=" << num_rowsets 
<< " range=["
+                           << hex(rs_start) << "," << hex(rs_end) << "]";
+    };
+
+    auto rs_start1 = rs_start;
+    do {
+        TxnErrorCode err = txn->get(rs_start1, rs_end, &it);
+        if (err != TxnErrorCode::TXN_OK) {
+            return {cast_as<ErrCategory::READ>(err),
+                    fmt::format("internal error, failed to get rowset range, 
err={} tablet_id={} "
+                                "range=[{}, {})",
+                                err, tablet_id, hex(rs_start), hex(rs_end))};
+        }
+
+        while (it->has_next()) {
+            auto [k, v] = it->next();
+
+            doris::RowsetMetaCloudPB rs;
+            if (!rs.ParseFromArray(v.data(), v.size())) {
+                return {MetaServiceCode::PROTOBUF_PARSE_ERR,
+                        fmt::format(
+                                "malformed rowset meta, unable to deserialize, 
tablet_id={} key={}",
+                                tablet_id, hex(k))};
+            }
+
+            callback(std::move(rs));
+
+            ++num_rowsets;
+            if (!it->has_next()) rs_start1 = k;
+        }
+        rs_start1.push_back('\x00'); // Update to next smallest key for 
iteration
+    } while (it->more());
+    return {MetaServiceCode::OK, ""};
+}
+
 void process_compaction_job(MetaServiceCode& code, std::string& msg, 
std::stringstream& ss,
                             std::unique_ptr<Transaction>& txn,
                             const FinishTabletJobRequest* request,
@@ -906,34 +946,51 @@ void process_compaction_job(MetaServiceCode& code, 
std::string& msg, std::string
     //                          Update tablet stats
     
//==========================================================================
     auto stats = response->mutable_stats();
+
+    MetaReader meta_reader(instance_id);
     TabletStats detached_stats;
-    // ATTN: The condition that snapshot read can be used to get tablet stats 
is: all other transactions that put tablet stats
-    //  can make read write conflicts with this transaction on other keys. 
Currently, if all meta-service nodes are running
-    //  with `config::split_tablet_stats = true` can meet the condition.
-    internal_get_tablet_stats(code, msg, txn.get(), instance_id, 
request->job().idx(), *stats,
-                              detached_stats, 
config::snapshot_get_tablet_stats);
-    if (code != MetaServiceCode::OK) {
-        LOG_WARNING("failed to get tablet stats")
-                .tag("instance_id", instance_id)
-                .tag("tablet_id", tablet_id)
-                .tag("code", code)
-                .tag("msg", msg);
-        return;
+    if (is_versioned_read) {
+        TxnErrorCode err =
+                meta_reader.get_tablet_compact_stats(txn.get(), tablet_id, 
stats, nullptr, true);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(err);
+            msg = fmt::format("failed to get tablet stats, tablet_id={}, 
err={}", tablet_id, err);
+            LOG(WARNING) << msg;
+            return;
+        }
+    } else {
+        // ATTN: The condition that snapshot read can be used to get tablet 
stats is: all other transactions that put tablet stats
+        //  can make read write conflicts with this transaction on other keys. 
Currently, if all meta-service nodes are running
+        //  with `config::split_tablet_stats = true` can meet the condition.
+        internal_get_tablet_stats(code, msg, txn.get(), instance_id, 
request->job().idx(), *stats,
+                                  detached_stats, 
config::snapshot_get_tablet_stats);
+        if (code != MetaServiceCode::OK) {
+            LOG_WARNING("failed to get tablet stats")
+                    .tag("instance_id", instance_id)
+                    .tag("tablet_id", tablet_id)
+                    .tag("code", code)
+                    .tag("msg", msg);
+            return;
+        }
     }
 
     if (is_versioned_write) {
         // read old TabletCompactStatsKey -> TabletStatsPB
         TabletStatsPB tablet_compact_stats;
-        MetaReader meta_reader(instance_id, txn_kv);
-        Versionstamp* versionstamp = nullptr;
-        TxnErrorCode err = meta_reader.get_tablet_compact_stats(
-                txn.get(), tablet_id, &tablet_compact_stats, versionstamp, 
false);
+        TxnErrorCode err = TxnErrorCode::TXN_OK;
+        if (is_versioned_read) {
+            // Reuse the above txn::get result.
+            tablet_compact_stats.CopyFrom(*stats);
+        } else {
+            err = meta_reader.get_tablet_compact_stats(txn.get(), tablet_id, 
&tablet_compact_stats,
+                                                       nullptr, false);
+        }
         if (err == TxnErrorCode::TXN_OK) {
             // tablet_compact_stats exists, update TabletStatsPB
             if (compaction_update_tablet_stats(compaction, 
&tablet_compact_stats, code, msg, now) ==
                 -1) {
                 LOG_WARNING("compaction_update_tablet_stats failed.")
-                        .tag("instancej_id", instance_id)
+                        .tag("instance_id", instance_id)
                         .tag("tablet_id", tablet_id)
                         .tag("compact_stats", 
tablet_compact_stats.ShortDebugString());
                 return;
@@ -998,7 +1055,7 @@ void process_compaction_job(MetaServiceCode& code, 
std::string& msg, std::string
                << " compaction.size_input_rowsets=" << 
compaction.size_input_rowsets();
     txn->put(stats_key, stats_val);
     merge_tablet_stats(*stats, detached_stats); // this is to check
-    if (stats->data_size() < 0 || stats->num_rowsets() < 1) [[unlikely]] {
+    if (!is_versioned_read && (stats->data_size() < 0 || stats->num_rowsets() 
< 1)) [[unlikely]] {
         INSTANCE_LOG(ERROR) << "buggy data size, tablet_id=" << tablet_id
                             << " stats.num_rows=" << stats->num_rows()
                             << " stats.data_size=" << stats->data_size()
@@ -1062,68 +1119,58 @@ void process_compaction_job(MetaServiceCode& code, 
std::string& msg, std::string
 
     compaction_log.set_start_version(start);
     compaction_log.set_end_version(end);
-
-    std::unique_ptr<RangeGetIterator> it;
     int num_rowsets = 0;
-    DORIS_CLOUD_DEFER {
-        INSTANCE_LOG(INFO) << "get rowset meta, num_rowsets=" << num_rowsets 
<< " range=["
-                           << hex(rs_start) << "," << hex(rs_end) << "]";
-    };
 
-    auto rs_start1 = rs_start;
-    do {
-        TxnErrorCode err = txn->get(rs_start1, rs_end, &it);
-        if (err != TxnErrorCode::TXN_OK) {
-            code = cast_as<ErrCategory::READ>(err);
-            SS << "internal error, failed to get rowset range, err=" << err
-               << " tablet_id=" << tablet_id << " range=[" << hex(rs_start1) 
<< ", << "
-               << hex(rs_end) << ")";
-            msg = ss.str();
-            return;
+    auto handle_compaction_input_rowset_meta = [&](doris::RowsetMetaCloudPB 
rs) {
+        // remove delete bitmap of input rowset for MoW table
+        if (compaction.has_delete_bitmap_lock_initiator()) {
+            auto delete_bitmap_start =
+                    meta_delete_bitmap_key({instance_id, tablet_id, 
rs.rowset_id_v2(), 0, 0});
+            auto delete_bitmap_end = meta_delete_bitmap_key(
+                    {instance_id, tablet_id, rs.rowset_id_v2(), INT64_MAX, 
INT64_MAX});
+            txn->remove(delete_bitmap_start, delete_bitmap_end);
         }
 
-        while (it->has_next()) {
-            auto [k, v] = it->next();
-
-            doris::RowsetMetaCloudPB rs;
-            if (!rs.ParseFromArray(v.data(), v.size())) {
-                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                SS << "malformed rowset meta, unable to deserialize, 
tablet_id=" << tablet_id
-                   << " key=" << hex(k);
-                msg = ss.str();
-                return;
-            }
-
-            // remove delete bitmap of input rowset for MoW table
-            if (compaction.has_delete_bitmap_lock_initiator()) {
-                auto delete_bitmap_start =
-                        meta_delete_bitmap_key({instance_id, tablet_id, 
rs.rowset_id_v2(), 0, 0});
-                auto delete_bitmap_end = meta_delete_bitmap_key(
-                        {instance_id, tablet_id, rs.rowset_id_v2(), INT64_MAX, 
INT64_MAX});
-                txn->remove(delete_bitmap_start, delete_bitmap_end);
-            }
+        auto recycle_key = recycle_rowset_key({instance_id, tablet_id, 
rs.rowset_id_v2()});
+        RecycleRowsetPB recycle_rowset;
+        recycle_rowset.set_creation_time(now);
+        recycle_rowset.mutable_rowset_meta()->CopyFrom(rs);
+        recycle_rowset.set_type(RecycleRowsetPB::COMPACT);
 
-            auto recycle_key = recycle_rowset_key({instance_id, tablet_id, 
rs.rowset_id_v2()});
-            RecycleRowsetPB recycle_rowset;
-            recycle_rowset.set_creation_time(now);
-            recycle_rowset.mutable_rowset_meta()->CopyFrom(rs);
-            recycle_rowset.set_type(RecycleRowsetPB::COMPACT);
-
-            if (is_versioned_write) {
-                compaction_log.add_recycle_rowsets()->Swap(&recycle_rowset);
-            } else {
-                auto recycle_val = recycle_rowset.SerializeAsString();
-                txn->put(recycle_key, recycle_val);
-            }
-
-            INSTANCE_LOG(INFO) << "put recycle rowset, tablet_id=" << tablet_id
-                               << " key=" << hex(recycle_key);
+        if (is_versioned_write) {
+            compaction_log.add_recycle_rowsets()->Swap(&recycle_rowset);
+        } else {
+            auto recycle_val = recycle_rowset.SerializeAsString();
+            txn->put(recycle_key, recycle_val);
+        }
 
-            ++num_rowsets;
-            if (!it->has_next()) rs_start1 = k;
+        INSTANCE_LOG(INFO) << "put recycle rowset, tablet_id=" << tablet_id
+                           << " key=" << hex(recycle_key);
+    };
+    if (!is_versioned_read) {
+        std::tie(code, msg) =
+                scan_compaction_input_rowsets(txn.get(), instance_id, 
tablet_id, rs_start, rs_end,
+                                              num_rowsets, 
handle_compaction_input_rowset_meta);
+        if (code != MetaServiceCode::OK) {
+            LOG(WARNING) << msg;
+            return;
         }
-        rs_start1.push_back('\x00'); // Update to next smallest key for 
iteration
-    } while (it->more());
+    } else {
+        std::vector<RowsetMetaCloudPB> rowset_metas;
+        TxnErrorCode err =
+                meta_reader.get_rowset_metas(txn.get(), tablet_id, start, end, 
&rowset_metas);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(err);
+            msg = fmt::format("failed to get rowset metas, tablet_id={}, 
start={}, end={}, err={}",
+                              tablet_id, start, end, err);
+            LOG(WARNING) << msg;
+            return;
+        }
+        num_rowsets = rowset_metas.size();
+        for (auto&& rowset_meta : rowset_metas) {
+            handle_compaction_input_rowset_meta(std::move(rowset_meta));
+        }
+    }
 
     txn->remove(rs_start, rs_end);
 
@@ -1243,6 +1290,9 @@ void process_compaction_job(MetaServiceCode& code, 
std::string& msg, std::string
         std::string operation_log_key = versioned::log_key({instance_id});
         std::string operation_log_value;
         OperationLogPB operation_log;
+        if (is_versioned_read) {
+            operation_log.set_min_timestamp(meta_reader.min_read_version());
+        }
         operation_log.mutable_compaction()->Swap(&compaction_log);
         if (!operation_log.SerializeToString(&operation_log_value)) {
             code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
diff --git a/cloud/test/meta_service_job_test.cpp 
b/cloud/test/meta_service_job_test.cpp
index 9a170c93e9d..b8b92364351 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -34,12 +34,56 @@
 #include "common/util.h"
 #include "cpp/sync_point.h"
 #include "meta-service/meta_service.h"
+#include "meta-store/document_message.h"
 #include "meta-store/keys.h"
+#include "meta-store/meta_reader.h"
 #include "meta-store/txn_kv.h"
 #include "meta-store/txn_kv_error.h"
+#include "meta-store/versioned_value.h"
 
 namespace doris::cloud {
+// External functions from meta_service_test.cpp
 extern std::unique_ptr<MetaServiceProxy> get_meta_service();
+extern std::unique_ptr<MetaServiceProxy> get_meta_service(bool 
mock_resource_mgr);
+extern void create_tablet(MetaServiceProxy* meta_service, int64_t table_id, 
int64_t index_id,
+                          int64_t partition_id, int64_t tablet_id);
+extern doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t 
tablet_id, int partition_id,
+                                              int64_t version, int num_rows);
+extern void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const 
std::string& label,
+                          int64_t table_id, int64_t partition_id, int64_t 
tablet_id);
+extern void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t 
index_id,
+                       int64_t partition_id, int64_t tablet_id);
+extern void get_tablet_stats(MetaServiceProxy* meta_service, int64_t table_id, 
int64_t index_id,
+                             int64_t partition_id, int64_t tablet_id, 
GetTabletStatsResponse& res);
+extern void create_and_commit_rowset(MetaServiceProxy* meta_service, int64_t 
table_id,
+                                     int64_t index_id, int64_t partition_id, 
int64_t tablet_id,
+                                     int64_t txn_id);
+
+// Create a MULTI_VERSION_READ_WRITE instance and refresh the resource manager.
+static void create_and_refresh_instance(MetaServiceProxy* service, std::string 
instance_id) {
+    // write instance
+    InstanceInfoPB instance_info;
+    instance_info.set_instance_id(instance_id);
+    instance_info.set_multi_version_status(MULTI_VERSION_READ_WRITE);
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+    txn->put(instance_key(instance_id), instance_info.SerializeAsString());
+    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+    service->resource_mgr()->refresh_instance(instance_id);
+    
ASSERT_TRUE(service->resource_mgr()->is_version_write_enabled(instance_id));
+}
+
+#define MOCK_GET_INSTANCE_ID(instance_id)                                      
    \
+    DORIS_CLOUD_DEFER {                                                        
    \
+        SyncPoint::get_instance()->clear_all_call_backs();                     
    \
+    };                                                                         
    \
+    SyncPoint::get_instance()->set_call_back("get_instance_id", [&](auto&& 
args) { \
+        auto* ret = try_any_cast_ret<std::string>(args);                       
    \
+        ret->first = instance_id;                                              
    \
+        ret->second = true;                                                    
    \
+    });                                                                        
    \
+    SyncPoint::get_instance()->enable_processing();
 
 namespace {
 const std::string instance_id = "MetaServiceJobTest";
@@ -105,9 +149,10 @@ doris::RowsetMetaCloudPB create_rowset(int64_t tablet_id, 
int64_t start_version,
 }
 
 void commit_rowset(MetaService* meta_service, const doris::RowsetMetaCloudPB& 
rowset,
-                   CreateRowsetResponse& res) {
+                   CreateRowsetResponse& res, int txn_id = 1) {
     brpc::Controller cntl;
     CreateRowsetRequest req;
+    req.set_txn_id(txn_id);
     req.mutable_rowset_meta()->CopyFrom(rowset);
     meta_service->commit_rowset(&cntl, &req, &res, nullptr);
 }
@@ -224,6 +269,7 @@ void create_tablet(MetaService* meta_service, int64_t 
table_id, int64_t index_id
     brpc::Controller cntl;
     CreateTabletsRequest req;
     CreateTabletsResponse res;
+    req.set_db_id(1);
     auto tablet = req.add_tablet_metas();
     tablet->set_tablet_state(not_ready ? doris::TabletStatePB::PB_NOTREADY
                                        : doris::TabletStatePB::PB_RUNNING);
@@ -1132,6 +1178,175 @@ TEST(MetaServiceJobTest, CompactionJobTest) {
     ASSERT_NO_FATAL_FAILURE(test_abort_compaction_job(1, 2, 3, 7));
 }
 
+TEST(MetaServiceJobVersionedReadTest, CompactionJobTest) {
+    auto meta_service = get_meta_service(false);
+    std::string instance_id = "test_cloud_instance_id";
+    std::string cloud_unique_id = "1:test_cloud_unique_id:1";
+    MOCK_GET_INSTANCE_ID(instance_id);
+    create_and_refresh_instance(meta_service.get(), instance_id);
+
+    int64_t table_id = 1, index_id = 2, partition_id = 3, tablet_id = 4;
+    {
+        // Create tablet
+        create_tablet(meta_service.get(), table_id, index_id, partition_id, 
tablet_id, true);
+    }
+
+    {
+        // Create rowsets
+        insert_rowset(meta_service.get(), 1, "commit_rowset_1", table_id, 
partition_id, tablet_id);
+        insert_rowset(meta_service.get(), 1, "commit_rowset_2", table_id, 
partition_id, tablet_id);
+        insert_rowset(meta_service.get(), 1, "commit_rowset_3", table_id, 
partition_id, tablet_id);
+        insert_rowset(meta_service.get(), 1, "commit_rowset_4", table_id, 
partition_id, tablet_id);
+        insert_rowset(meta_service.get(), 1, "commit_rowset_5", table_id, 
partition_id, tablet_id);
+        insert_rowset(meta_service.get(), 1, "commit_rowset_6", table_id, 
partition_id, tablet_id);
+        insert_rowset(meta_service.get(), 1, "commit_rowset_7", table_id, 
partition_id, tablet_id);
+        insert_rowset(meta_service.get(), 1, "commit_rowset_8", table_id, 
partition_id, tablet_id);
+    }
+
+    struct TestCase {
+        TabletCompactionJobPB::CompactionType type;
+        int64_t start_version, end_version;
+        int64_t num_input_rowsets;
+    };
+
+    std::vector<TestCase> cases = {
+            {TabletCompactionJobPB::CUMULATIVE, 7, 8, 2},
+            {TabletCompactionJobPB::BASE, 2, 5, 4},
+            {TabletCompactionJobPB::CUMULATIVE, 6, 8, 2},
+    };
+
+    auto get_tablet_stats = [&]() -> TabletStatsPB {
+        GetTabletStatsRequest get_tablet_stats_req;
+        get_tablet_stats_req.set_cloud_unique_id(cloud_unique_id);
+        auto* tablet_idx = get_tablet_stats_req.add_tablet_idx();
+        tablet_idx->set_tablet_id(tablet_id);
+        tablet_idx->set_db_id(1);
+        tablet_idx->set_index_id(index_id);
+        tablet_idx->set_partition_id(partition_id);
+        tablet_idx->set_table_id(table_id);
+        GetTabletStatsResponse get_tablet_stats_resp;
+        brpc::Controller cntl;
+        meta_service->get_tablet_stats(&cntl, &get_tablet_stats_req, 
&get_tablet_stats_resp,
+                                       nullptr);
+        EXPECT_EQ(get_tablet_stats_resp.status().code(), MetaServiceCode::OK);
+        EXPECT_EQ(get_tablet_stats_resp.tablet_stats_size(), 1);
+        return get_tablet_stats_resp.tablet_stats(0);
+    };
+
+    int base_cnt = 0, cumu_cnt = 0;
+    int64_t txn_id = 123321;
+    for (auto& tc : cases) {
+        std::string job_id = fmt::format("job_{}_{}", base_cnt, cumu_cnt);
+        TabletCompactionJobPB::CompactionType type = tc.type;
+
+        {
+            // Start compaction job
+            StartTabletJobResponse res;
+            start_compaction_job(meta_service.get(), tablet_id, job_id, 
"ip:port", base_cnt,
+                                 cumu_cnt, type, res);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+
+        txn_id += 1;
+        {
+            // Put tmp rowset
+            auto tmp_rowset = create_rowset(tablet_id, tc.start_version, 
tc.end_version, 100);
+            tmp_rowset.set_txn_id(txn_id);
+            CreateRowsetResponse res;
+            commit_rowset(meta_service.get(), tmp_rowset, res, txn_id);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+
+        auto tablet_stats_pb = get_tablet_stats();
+
+        {
+            // Commit compaction job.
+            FinishTabletJobRequest req;
+            FinishTabletJobResponse res;
+
+            req.set_action(FinishTabletJobRequest::COMMIT);
+            req.mutable_job()->mutable_idx()->set_table_id(table_id);
+            req.mutable_job()->mutable_idx()->set_index_id(index_id);
+            req.mutable_job()->mutable_idx()->set_partition_id(partition_id);
+            req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
+            auto compaction = req.mutable_job()->add_compaction();
+            compaction->set_id(job_id);
+            compaction->set_initiator("ip:port");
+            compaction->set_base_compaction_cnt(base_cnt);
+            compaction->set_cumulative_compaction_cnt(cumu_cnt);
+            compaction->add_txn_id(txn_id);
+
+            std::mt19937 
rng(std::chrono::system_clock::now().time_since_epoch().count());
+            std::uniform_int_distribution<int> dist(1, 10000); // Positive 
numbers
+
+            compaction->set_output_cumulative_point(2);
+            compaction->set_num_output_rows(100);
+            compaction->set_num_output_rowsets(1);
+            compaction->set_num_output_segments(1);
+            compaction->set_num_input_rows(tc.num_input_rowsets * 100);
+            compaction->set_num_input_rowsets(tc.num_input_rowsets);
+            compaction->set_num_input_segments(tc.num_input_rowsets);
+            compaction->set_size_input_rowsets(tc.num_input_rowsets * 100 * 
110);
+            compaction->set_size_output_rowsets(100 * 110);
+            compaction->set_index_size_input_rowsets(tc.num_input_rowsets * 
100 * 10);
+            compaction->set_segment_size_output_rowsets(100 * 110);
+            compaction->set_index_size_input_rowsets(tc.num_input_rowsets * 
100 * 10);
+            compaction->set_segment_size_output_rowsets(100 * 110);
+            compaction->set_type(type);
+            compaction->add_input_versions(tc.start_version);
+            compaction->add_input_versions(tc.end_version);
+            compaction->add_output_versions(tc.end_version);
+            compaction->add_output_rowset_ids("output rowset id");
+
+            SyncPoint::get_instance()->set_call_back(
+                    "process_compaction_job::loop_input_done", [&](auto&& 
args) {
+                        auto* num_input_rowsets = try_any_cast<int*>(args[0]);
+                        ASSERT_EQ(*num_input_rowsets, tc.num_input_rowsets);
+                    });
+
+            brpc::Controller cntl;
+            meta_service->finish_tablet_job(&cntl, &req, &res, nullptr);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+            if (type == TabletCompactionJobPB::BASE) {
+                base_cnt++;
+            } else {
+                cumu_cnt++;
+            }
+            auto stats = get_tablet_stats();
+
+            EXPECT_EQ(stats.base_compaction_cnt(),
+                      tablet_stats_pb.base_compaction_cnt() +
+                              (req.job().compaction(0).type() == 
TabletCompactionJobPB::BASE));
+            EXPECT_EQ(
+                    stats.cumulative_compaction_cnt(),
+                    tablet_stats_pb.cumulative_compaction_cnt() +
+                            (req.job().compaction(0).type() == 
TabletCompactionJobPB::CUMULATIVE));
+            EXPECT_EQ(stats.cumulative_point(), 
req.job().compaction(0).output_cumulative_point());
+            EXPECT_EQ(stats.num_rows(),
+                      tablet_stats_pb.num_rows() + 
(req.job().compaction(0).num_output_rows() -
+                                                    
req.job().compaction(0).num_input_rows()));
+            EXPECT_EQ(stats.data_size(),
+                      tablet_stats_pb.data_size() + 
(req.job().compaction(0).size_output_rowsets() -
+                                                     
req.job().compaction(0).size_input_rowsets()));
+            EXPECT_EQ(stats.num_rowsets(), tablet_stats_pb.num_rowsets() +
+                                                   
(req.job().compaction(0).num_output_rowsets() -
+                                                    
req.job().compaction(0).num_input_rowsets()));
+            EXPECT_EQ(stats.num_segments(), tablet_stats_pb.num_segments() +
+                                                    
(req.job().compaction(0).num_output_segments() -
+                                                     
req.job().compaction(0).num_input_segments()));
+            EXPECT_EQ(stats.index_size(),
+                      tablet_stats_pb.index_size() +
+                              
(req.job().compaction(0).index_size_output_rowsets() -
+                               
req.job().compaction(0).index_size_input_rowsets()));
+            EXPECT_EQ(stats.segment_size(),
+                      tablet_stats_pb.segment_size() +
+                              
(req.job().compaction(0).segment_size_output_rowsets() -
+                               
req.job().compaction(0).segment_size_input_rowsets()));
+        }
+    }
+}
+
 void check_delete_bitmap_lock(MetaServiceProxy* meta_service, std::string 
instance_id,
                               int64_t table_id, int64_t lock_id, bool exist) {
     std::unique_ptr<Transaction> txn;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to