This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 007ad37efd8 [feat](cloud) process_compaction_job supports versioned
read and save min read version (#55079)
007ad37efd8 is described below
commit 007ad37efd818e22dd10467162ebe7886f237ab7
Author: walter <[email protected]>
AuthorDate: Mon Aug 25 16:09:12 2025 +0800
[feat](cloud) process_compaction_job supports versioned read and save min
read version (#55079)
---
cloud/src/meta-service/meta_service_job.cpp | 196 +++++++++++++++----------
cloud/test/meta_service_job_test.cpp | 217 +++++++++++++++++++++++++++-
2 files changed, 339 insertions(+), 74 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_job.cpp
b/cloud/src/meta-service/meta_service_job.cpp
index 6ac1ed02d2d..12c45a6759f 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -784,6 +784,46 @@ int compaction_update_tablet_stats(const
TabletCompactionJobPB& compaction, Tabl
return 0;
}
+std::pair<MetaServiceCode, std::string> scan_compaction_input_rowsets(
+ Transaction* txn, std::string_view instance_id, int64_t tablet_id,
std::string& rs_start,
+ std::string& rs_end, int& num_rowsets, auto&& callback) {
+ std::unique_ptr<RangeGetIterator> it;
+ DORIS_CLOUD_DEFER {
+ INSTANCE_LOG(INFO) << "get rowset meta, num_rowsets=" << num_rowsets
<< " range=["
+ << hex(rs_start) << "," << hex(rs_end) << "]";
+ };
+
+ auto rs_start1 = rs_start;
+ do {
+ TxnErrorCode err = txn->get(rs_start1, rs_end, &it);
+ if (err != TxnErrorCode::TXN_OK) {
+ return {cast_as<ErrCategory::READ>(err),
+ fmt::format("internal error, failed to get rowset range,
err={} tablet_id={} "
+ "range=[{}, {})",
+ err, tablet_id, hex(rs_start), hex(rs_end))};
+ }
+
+ while (it->has_next()) {
+ auto [k, v] = it->next();
+
+ doris::RowsetMetaCloudPB rs;
+ if (!rs.ParseFromArray(v.data(), v.size())) {
+ return {MetaServiceCode::PROTOBUF_PARSE_ERR,
+ fmt::format(
+ "malformed rowset meta, unable to deserialize,
tablet_id={} key={}",
+ tablet_id, hex(k))};
+ }
+
+ callback(std::move(rs));
+
+ ++num_rowsets;
+ if (!it->has_next()) rs_start1 = k;
+ }
+ rs_start1.push_back('\x00'); // Update to next smallest key for
iteration
+ } while (it->more());
+ return {MetaServiceCode::OK, ""};
+}
+
void process_compaction_job(MetaServiceCode& code, std::string& msg,
std::stringstream& ss,
std::unique_ptr<Transaction>& txn,
const FinishTabletJobRequest* request,
@@ -906,34 +946,51 @@ void process_compaction_job(MetaServiceCode& code,
std::string& msg, std::string
// Update tablet stats
//==========================================================================
auto stats = response->mutable_stats();
+
+ MetaReader meta_reader(instance_id);
TabletStats detached_stats;
- // ATTN: The condition that snapshot read can be used to get tablet stats
is: all other transactions that put tablet stats
- // can make read write conflicts with this transaction on other keys.
Currently, if all meta-service nodes are running
- // with `config::split_tablet_stats = true` can meet the condition.
- internal_get_tablet_stats(code, msg, txn.get(), instance_id,
request->job().idx(), *stats,
- detached_stats,
config::snapshot_get_tablet_stats);
- if (code != MetaServiceCode::OK) {
- LOG_WARNING("failed to get tablet stats")
- .tag("instance_id", instance_id)
- .tag("tablet_id", tablet_id)
- .tag("code", code)
- .tag("msg", msg);
- return;
+ if (is_versioned_read) {
+ TxnErrorCode err =
+ meta_reader.get_tablet_compact_stats(txn.get(), tablet_id,
stats, nullptr, true);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get tablet stats, tablet_id={},
err={}", tablet_id, err);
+ LOG(WARNING) << msg;
+ return;
+ }
+ } else {
+ // ATTN: The condition that snapshot read can be used to get tablet
stats is: all other transactions that put tablet stats
+ // can make read write conflicts with this transaction on other keys.
Currently, if all meta-service nodes are running
+ // with `config::split_tablet_stats = true` can meet the condition.
+ internal_get_tablet_stats(code, msg, txn.get(), instance_id,
request->job().idx(), *stats,
+ detached_stats,
config::snapshot_get_tablet_stats);
+ if (code != MetaServiceCode::OK) {
+ LOG_WARNING("failed to get tablet stats")
+ .tag("instance_id", instance_id)
+ .tag("tablet_id", tablet_id)
+ .tag("code", code)
+ .tag("msg", msg);
+ return;
+ }
}
if (is_versioned_write) {
// read old TabletCompactStatsKey -> TabletStatsPB
TabletStatsPB tablet_compact_stats;
- MetaReader meta_reader(instance_id, txn_kv);
- Versionstamp* versionstamp = nullptr;
- TxnErrorCode err = meta_reader.get_tablet_compact_stats(
- txn.get(), tablet_id, &tablet_compact_stats, versionstamp,
false);
+ TxnErrorCode err = TxnErrorCode::TXN_OK;
+ if (is_versioned_read) {
+ // Reuse the above txn::get result.
+ tablet_compact_stats.CopyFrom(*stats);
+ } else {
+ err = meta_reader.get_tablet_compact_stats(txn.get(), tablet_id,
&tablet_compact_stats,
+ nullptr, false);
+ }
if (err == TxnErrorCode::TXN_OK) {
// tablet_compact_stats exists, update TabletStatsPB
if (compaction_update_tablet_stats(compaction,
&tablet_compact_stats, code, msg, now) ==
-1) {
LOG_WARNING("compaction_update_tablet_stats failed.")
- .tag("instancej_id", instance_id)
+ .tag("instance_id", instance_id)
.tag("tablet_id", tablet_id)
.tag("compact_stats",
tablet_compact_stats.ShortDebugString());
return;
@@ -998,7 +1055,7 @@ void process_compaction_job(MetaServiceCode& code,
std::string& msg, std::string
<< " compaction.size_input_rowsets=" <<
compaction.size_input_rowsets();
txn->put(stats_key, stats_val);
merge_tablet_stats(*stats, detached_stats); // this is to check
- if (stats->data_size() < 0 || stats->num_rowsets() < 1) [[unlikely]] {
+ if (!is_versioned_read && (stats->data_size() < 0 || stats->num_rowsets()
< 1)) [[unlikely]] {
INSTANCE_LOG(ERROR) << "buggy data size, tablet_id=" << tablet_id
<< " stats.num_rows=" << stats->num_rows()
<< " stats.data_size=" << stats->data_size()
@@ -1062,68 +1119,58 @@ void process_compaction_job(MetaServiceCode& code,
std::string& msg, std::string
compaction_log.set_start_version(start);
compaction_log.set_end_version(end);
-
- std::unique_ptr<RangeGetIterator> it;
int num_rowsets = 0;
- DORIS_CLOUD_DEFER {
- INSTANCE_LOG(INFO) << "get rowset meta, num_rowsets=" << num_rowsets
<< " range=["
- << hex(rs_start) << "," << hex(rs_end) << "]";
- };
- auto rs_start1 = rs_start;
- do {
- TxnErrorCode err = txn->get(rs_start1, rs_end, &it);
- if (err != TxnErrorCode::TXN_OK) {
- code = cast_as<ErrCategory::READ>(err);
- SS << "internal error, failed to get rowset range, err=" << err
- << " tablet_id=" << tablet_id << " range=[" << hex(rs_start1)
<< ", << "
- << hex(rs_end) << ")";
- msg = ss.str();
- return;
+ auto handle_compaction_input_rowset_meta = [&](doris::RowsetMetaCloudPB
rs) {
+ // remove delete bitmap of input rowset for MoW table
+ if (compaction.has_delete_bitmap_lock_initiator()) {
+ auto delete_bitmap_start =
+ meta_delete_bitmap_key({instance_id, tablet_id,
rs.rowset_id_v2(), 0, 0});
+ auto delete_bitmap_end = meta_delete_bitmap_key(
+ {instance_id, tablet_id, rs.rowset_id_v2(), INT64_MAX,
INT64_MAX});
+ txn->remove(delete_bitmap_start, delete_bitmap_end);
}
- while (it->has_next()) {
- auto [k, v] = it->next();
-
- doris::RowsetMetaCloudPB rs;
- if (!rs.ParseFromArray(v.data(), v.size())) {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- SS << "malformed rowset meta, unable to deserialize,
tablet_id=" << tablet_id
- << " key=" << hex(k);
- msg = ss.str();
- return;
- }
-
- // remove delete bitmap of input rowset for MoW table
- if (compaction.has_delete_bitmap_lock_initiator()) {
- auto delete_bitmap_start =
- meta_delete_bitmap_key({instance_id, tablet_id,
rs.rowset_id_v2(), 0, 0});
- auto delete_bitmap_end = meta_delete_bitmap_key(
- {instance_id, tablet_id, rs.rowset_id_v2(), INT64_MAX,
INT64_MAX});
- txn->remove(delete_bitmap_start, delete_bitmap_end);
- }
+ auto recycle_key = recycle_rowset_key({instance_id, tablet_id,
rs.rowset_id_v2()});
+ RecycleRowsetPB recycle_rowset;
+ recycle_rowset.set_creation_time(now);
+ recycle_rowset.mutable_rowset_meta()->CopyFrom(rs);
+ recycle_rowset.set_type(RecycleRowsetPB::COMPACT);
- auto recycle_key = recycle_rowset_key({instance_id, tablet_id,
rs.rowset_id_v2()});
- RecycleRowsetPB recycle_rowset;
- recycle_rowset.set_creation_time(now);
- recycle_rowset.mutable_rowset_meta()->CopyFrom(rs);
- recycle_rowset.set_type(RecycleRowsetPB::COMPACT);
-
- if (is_versioned_write) {
- compaction_log.add_recycle_rowsets()->Swap(&recycle_rowset);
- } else {
- auto recycle_val = recycle_rowset.SerializeAsString();
- txn->put(recycle_key, recycle_val);
- }
-
- INSTANCE_LOG(INFO) << "put recycle rowset, tablet_id=" << tablet_id
- << " key=" << hex(recycle_key);
+ if (is_versioned_write) {
+ compaction_log.add_recycle_rowsets()->Swap(&recycle_rowset);
+ } else {
+ auto recycle_val = recycle_rowset.SerializeAsString();
+ txn->put(recycle_key, recycle_val);
+ }
- ++num_rowsets;
- if (!it->has_next()) rs_start1 = k;
+ INSTANCE_LOG(INFO) << "put recycle rowset, tablet_id=" << tablet_id
+ << " key=" << hex(recycle_key);
+ };
+ if (!is_versioned_read) {
+ std::tie(code, msg) =
+ scan_compaction_input_rowsets(txn.get(), instance_id,
tablet_id, rs_start, rs_end,
+ num_rowsets,
handle_compaction_input_rowset_meta);
+ if (code != MetaServiceCode::OK) {
+ LOG(WARNING) << msg;
+ return;
}
- rs_start1.push_back('\x00'); // Update to next smallest key for
iteration
- } while (it->more());
+ } else {
+ std::vector<RowsetMetaCloudPB> rowset_metas;
+ TxnErrorCode err =
+ meta_reader.get_rowset_metas(txn.get(), tablet_id, start, end,
&rowset_metas);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get rowset metas, tablet_id={},
start={}, end={}, err={}",
+ tablet_id, start, end, err);
+ LOG(WARNING) << msg;
+ return;
+ }
+ num_rowsets = rowset_metas.size();
+ for (auto&& rowset_meta : rowset_metas) {
+ handle_compaction_input_rowset_meta(std::move(rowset_meta));
+ }
+ }
txn->remove(rs_start, rs_end);
@@ -1243,6 +1290,9 @@ void process_compaction_job(MetaServiceCode& code,
std::string& msg, std::string
std::string operation_log_key = versioned::log_key({instance_id});
std::string operation_log_value;
OperationLogPB operation_log;
+ if (is_versioned_read) {
+ operation_log.set_min_timestamp(meta_reader.min_read_version());
+ }
operation_log.mutable_compaction()->Swap(&compaction_log);
if (!operation_log.SerializeToString(&operation_log_value)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
diff --git a/cloud/test/meta_service_job_test.cpp
b/cloud/test/meta_service_job_test.cpp
index 9a170c93e9d..b8b92364351 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -34,12 +34,56 @@
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/meta_service.h"
+#include "meta-store/document_message.h"
#include "meta-store/keys.h"
+#include "meta-store/meta_reader.h"
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"
+#include "meta-store/versioned_value.h"
namespace doris::cloud {
+// External functions from meta_service_test.cpp
extern std::unique_ptr<MetaServiceProxy> get_meta_service();
+extern std::unique_ptr<MetaServiceProxy> get_meta_service(bool
mock_resource_mgr);
+extern void create_tablet(MetaServiceProxy* meta_service, int64_t table_id,
int64_t index_id,
+ int64_t partition_id, int64_t tablet_id);
+extern doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t
tablet_id, int partition_id,
+ int64_t version, int num_rows);
+extern void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const
std::string& label,
+ int64_t table_id, int64_t partition_id, int64_t
tablet_id);
+extern void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t
index_id,
+ int64_t partition_id, int64_t tablet_id);
+extern void get_tablet_stats(MetaServiceProxy* meta_service, int64_t table_id,
int64_t index_id,
+ int64_t partition_id, int64_t tablet_id,
GetTabletStatsResponse& res);
+extern void create_and_commit_rowset(MetaServiceProxy* meta_service, int64_t
table_id,
+ int64_t index_id, int64_t partition_id,
int64_t tablet_id,
+ int64_t txn_id);
+
+// Create a MULTI_VERSION_READ_WRITE instance and refresh the resource manager.
+static void create_and_refresh_instance(MetaServiceProxy* service, std::string
instance_id) {
+ // write instance
+ InstanceInfoPB instance_info;
+ instance_info.set_instance_id(instance_id);
+ instance_info.set_multi_version_status(MULTI_VERSION_READ_WRITE);
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(instance_key(instance_id), instance_info.SerializeAsString());
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ service->resource_mgr()->refresh_instance(instance_id);
+
ASSERT_TRUE(service->resource_mgr()->is_version_write_enabled(instance_id));
+}
+
+#define MOCK_GET_INSTANCE_ID(instance_id)
\
+ DORIS_CLOUD_DEFER {
\
+ SyncPoint::get_instance()->clear_all_call_backs();
\
+ };
\
+ SyncPoint::get_instance()->set_call_back("get_instance_id", [&](auto&&
args) { \
+ auto* ret = try_any_cast_ret<std::string>(args);
\
+ ret->first = instance_id;
\
+ ret->second = true;
\
+ });
\
+ SyncPoint::get_instance()->enable_processing();
namespace {
const std::string instance_id = "MetaServiceJobTest";
@@ -105,9 +149,10 @@ doris::RowsetMetaCloudPB create_rowset(int64_t tablet_id,
int64_t start_version,
}
void commit_rowset(MetaService* meta_service, const doris::RowsetMetaCloudPB&
rowset,
- CreateRowsetResponse& res) {
+ CreateRowsetResponse& res, int txn_id = 1) {
brpc::Controller cntl;
CreateRowsetRequest req;
+ req.set_txn_id(txn_id);
req.mutable_rowset_meta()->CopyFrom(rowset);
meta_service->commit_rowset(&cntl, &req, &res, nullptr);
}
@@ -224,6 +269,7 @@ void create_tablet(MetaService* meta_service, int64_t
table_id, int64_t index_id
brpc::Controller cntl;
CreateTabletsRequest req;
CreateTabletsResponse res;
+ req.set_db_id(1);
auto tablet = req.add_tablet_metas();
tablet->set_tablet_state(not_ready ? doris::TabletStatePB::PB_NOTREADY
: doris::TabletStatePB::PB_RUNNING);
@@ -1132,6 +1178,175 @@ TEST(MetaServiceJobTest, CompactionJobTest) {
ASSERT_NO_FATAL_FAILURE(test_abort_compaction_job(1, 2, 3, 7));
}
+TEST(MetaServiceJobVersionedReadTest, CompactionJobTest) {
+ auto meta_service = get_meta_service(false);
+ std::string instance_id = "test_cloud_instance_id";
+ std::string cloud_unique_id = "1:test_cloud_unique_id:1";
+ MOCK_GET_INSTANCE_ID(instance_id);
+ create_and_refresh_instance(meta_service.get(), instance_id);
+
+ int64_t table_id = 1, index_id = 2, partition_id = 3, tablet_id = 4;
+ {
+ // Create tablet
+ create_tablet(meta_service.get(), table_id, index_id, partition_id,
tablet_id, true);
+ }
+
+ {
+ // Create rowsets
+ insert_rowset(meta_service.get(), 1, "commit_rowset_1", table_id,
partition_id, tablet_id);
+ insert_rowset(meta_service.get(), 1, "commit_rowset_2", table_id,
partition_id, tablet_id);
+ insert_rowset(meta_service.get(), 1, "commit_rowset_3", table_id,
partition_id, tablet_id);
+ insert_rowset(meta_service.get(), 1, "commit_rowset_4", table_id,
partition_id, tablet_id);
+ insert_rowset(meta_service.get(), 1, "commit_rowset_5", table_id,
partition_id, tablet_id);
+ insert_rowset(meta_service.get(), 1, "commit_rowset_6", table_id,
partition_id, tablet_id);
+ insert_rowset(meta_service.get(), 1, "commit_rowset_7", table_id,
partition_id, tablet_id);
+ insert_rowset(meta_service.get(), 1, "commit_rowset_8", table_id,
partition_id, tablet_id);
+ }
+
+ struct TestCase {
+ TabletCompactionJobPB::CompactionType type;
+ int64_t start_version, end_version;
+ int64_t num_input_rowsets;
+ };
+
+ std::vector<TestCase> cases = {
+ {TabletCompactionJobPB::CUMULATIVE, 7, 8, 2},
+ {TabletCompactionJobPB::BASE, 2, 5, 4},
+ {TabletCompactionJobPB::CUMULATIVE, 6, 8, 2},
+ };
+
+ auto get_tablet_stats = [&]() -> TabletStatsPB {
+ GetTabletStatsRequest get_tablet_stats_req;
+ get_tablet_stats_req.set_cloud_unique_id(cloud_unique_id);
+ auto* tablet_idx = get_tablet_stats_req.add_tablet_idx();
+ tablet_idx->set_tablet_id(tablet_id);
+ tablet_idx->set_db_id(1);
+ tablet_idx->set_index_id(index_id);
+ tablet_idx->set_partition_id(partition_id);
+ tablet_idx->set_table_id(table_id);
+ GetTabletStatsResponse get_tablet_stats_resp;
+ brpc::Controller cntl;
+ meta_service->get_tablet_stats(&cntl, &get_tablet_stats_req,
&get_tablet_stats_resp,
+ nullptr);
+ EXPECT_EQ(get_tablet_stats_resp.status().code(), MetaServiceCode::OK);
+ EXPECT_EQ(get_tablet_stats_resp.tablet_stats_size(), 1);
+ return get_tablet_stats_resp.tablet_stats(0);
+ };
+
+ int base_cnt = 0, cumu_cnt = 0;
+ int64_t txn_id = 123321;
+ for (auto& tc : cases) {
+ std::string job_id = fmt::format("job_{}_{}", base_cnt, cumu_cnt);
+ TabletCompactionJobPB::CompactionType type = tc.type;
+
+ {
+ // Start compaction job
+ StartTabletJobResponse res;
+ start_compaction_job(meta_service.get(), tablet_id, job_id,
"ip:port", base_cnt,
+ cumu_cnt, type, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ txn_id += 1;
+ {
+ // Put tmp rowset
+ auto tmp_rowset = create_rowset(tablet_id, tc.start_version,
tc.end_version, 100);
+ tmp_rowset.set_txn_id(txn_id);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res, txn_id);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ auto tablet_stats_pb = get_tablet_stats();
+
+ {
+ // Commit compaction job.
+ FinishTabletJobRequest req;
+ FinishTabletJobResponse res;
+
+ req.set_action(FinishTabletJobRequest::COMMIT);
+ req.mutable_job()->mutable_idx()->set_table_id(table_id);
+ req.mutable_job()->mutable_idx()->set_index_id(index_id);
+ req.mutable_job()->mutable_idx()->set_partition_id(partition_id);
+ req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
+ auto compaction = req.mutable_job()->add_compaction();
+ compaction->set_id(job_id);
+ compaction->set_initiator("ip:port");
+ compaction->set_base_compaction_cnt(base_cnt);
+ compaction->set_cumulative_compaction_cnt(cumu_cnt);
+ compaction->add_txn_id(txn_id);
+
+ std::mt19937
rng(std::chrono::system_clock::now().time_since_epoch().count());
+ std::uniform_int_distribution<int> dist(1, 10000); // Positive
numbers
+
+ compaction->set_output_cumulative_point(2);
+ compaction->set_num_output_rows(100);
+ compaction->set_num_output_rowsets(1);
+ compaction->set_num_output_segments(1);
+ compaction->set_num_input_rows(tc.num_input_rowsets * 100);
+ compaction->set_num_input_rowsets(tc.num_input_rowsets);
+ compaction->set_num_input_segments(tc.num_input_rowsets);
+ compaction->set_size_input_rowsets(tc.num_input_rowsets * 100 *
110);
+ compaction->set_size_output_rowsets(100 * 110);
+ compaction->set_index_size_input_rowsets(tc.num_input_rowsets *
100 * 10);
+ compaction->set_segment_size_output_rowsets(100 * 110);
+ compaction->set_index_size_input_rowsets(tc.num_input_rowsets *
100 * 10);
+ compaction->set_segment_size_output_rowsets(100 * 110);
+ compaction->set_type(type);
+ compaction->add_input_versions(tc.start_version);
+ compaction->add_input_versions(tc.end_version);
+ compaction->add_output_versions(tc.end_version);
+ compaction->add_output_rowset_ids("output rowset id");
+
+ SyncPoint::get_instance()->set_call_back(
+ "process_compaction_job::loop_input_done", [&](auto&&
args) {
+ auto* num_input_rowsets = try_any_cast<int*>(args[0]);
+ ASSERT_EQ(*num_input_rowsets, tc.num_input_rowsets);
+ });
+
+ brpc::Controller cntl;
+ meta_service->finish_tablet_job(&cntl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+ if (type == TabletCompactionJobPB::BASE) {
+ base_cnt++;
+ } else {
+ cumu_cnt++;
+ }
+ auto stats = get_tablet_stats();
+
+ EXPECT_EQ(stats.base_compaction_cnt(),
+ tablet_stats_pb.base_compaction_cnt() +
+ (req.job().compaction(0).type() ==
TabletCompactionJobPB::BASE));
+ EXPECT_EQ(
+ stats.cumulative_compaction_cnt(),
+ tablet_stats_pb.cumulative_compaction_cnt() +
+ (req.job().compaction(0).type() ==
TabletCompactionJobPB::CUMULATIVE));
+ EXPECT_EQ(stats.cumulative_point(),
req.job().compaction(0).output_cumulative_point());
+ EXPECT_EQ(stats.num_rows(),
+ tablet_stats_pb.num_rows() +
(req.job().compaction(0).num_output_rows() -
+
req.job().compaction(0).num_input_rows()));
+ EXPECT_EQ(stats.data_size(),
+ tablet_stats_pb.data_size() +
(req.job().compaction(0).size_output_rowsets() -
+
req.job().compaction(0).size_input_rowsets()));
+ EXPECT_EQ(stats.num_rowsets(), tablet_stats_pb.num_rowsets() +
+
(req.job().compaction(0).num_output_rowsets() -
+
req.job().compaction(0).num_input_rowsets()));
+ EXPECT_EQ(stats.num_segments(), tablet_stats_pb.num_segments() +
+
(req.job().compaction(0).num_output_segments() -
+
req.job().compaction(0).num_input_segments()));
+ EXPECT_EQ(stats.index_size(),
+ tablet_stats_pb.index_size() +
+
(req.job().compaction(0).index_size_output_rowsets() -
+
req.job().compaction(0).index_size_input_rowsets()));
+ EXPECT_EQ(stats.segment_size(),
+ tablet_stats_pb.segment_size() +
+
(req.job().compaction(0).segment_size_output_rowsets() -
+
req.job().compaction(0).segment_size_input_rowsets()));
+ }
+ }
+}
+
void check_delete_bitmap_lock(MetaServiceProxy* meta_service, std::string
instance_id,
int64_t table_id, int64_t lock_id, bool exist) {
std::unique_ptr<Transaction> txn;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]