This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8445855cf25 [fix](cloud) Fix schema change stats id and cumulative
point (#55703)
8445855cf25 is described below
commit 8445855cf25a57b25f6703ade3dcfc51852e4091
Author: walter <[email protected]>
AuthorDate: Fri Sep 5 17:34:31 2025 +0800
[fix](cloud) Fix schema change stats id and cumulative point (#55703)
---
cloud/src/meta-service/meta_service_job.cpp | 86 ++++++++++++++++++++---------
cloud/test/meta_service_job_test.cpp | 9 ++-
2 files changed, 68 insertions(+), 27 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_job.cpp
b/cloud/src/meta-service/meta_service_job.cpp
index 21fe8b9663b..b2e657c74f6 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -1341,6 +1341,29 @@ void schema_change_update_tablet_stats(const
TabletSchemaChangeJobPB& schema_cha
segment_size_remove_rowsets));
}
+std::pair<TabletStatsPB, TabletStatsPB>
split_tablet_stats_into_load_and_compact_parts(
+ const TabletStatsPB& stats) {
+ TabletStatsPB load_stats, compact_stats;
+ compact_stats.set_base_compaction_cnt(stats.base_compaction_cnt());
+
compact_stats.set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
+ compact_stats.set_cumulative_point(stats.cumulative_point());
+
compact_stats.set_last_base_compaction_time_ms(stats.last_base_compaction_time_ms());
+
compact_stats.set_last_cumu_compaction_time_ms(stats.last_cumu_compaction_time_ms());
+ compact_stats.set_full_compaction_cnt(stats.full_compaction_cnt());
+
compact_stats.set_last_full_compaction_time_ms(stats.last_full_compaction_time_ms());
+ compact_stats.mutable_idx()->CopyFrom(stats.idx());
+
+ load_stats.set_num_rows(stats.num_rows());
+ load_stats.set_num_rowsets(stats.num_rowsets());
+ load_stats.set_num_segments(stats.num_segments());
+ load_stats.set_data_size(stats.data_size());
+ load_stats.set_index_size(stats.index_size());
+ load_stats.set_segment_size(stats.segment_size());
+ load_stats.mutable_idx()->CopyFrom(stats.idx());
+
+ return {load_stats, compact_stats};
+}
+
std::pair<MetaServiceCode, std::string> scan_schema_change_input_rowsets(
Transaction* txn, std::string_view instance_id, int64_t new_tablet_id,
std::string& rs_start, std::string& rs_end, auto&& callback) {
@@ -1708,12 +1731,13 @@ void process_schema_change_job(MetaServiceCode& code,
std::string& msg, std::str
}
} else {
std::vector<RowsetMetaCloudPB> rowset_metas;
- TxnErrorCode err = reader.get_rowset_metas(txn.get(), tablet_id, 2,
+ TxnErrorCode err = reader.get_rowset_metas(txn.get(), new_tablet_id, 2,
schema_change.alter_version(), &rowset_metas);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
- msg = fmt::format("failed to get rowset metas, tablet_id={},
start={}, end={}, err={}",
- tablet_id, 2, schema_change.alter_version(),
err);
+ msg = fmt::format(
+ "failed to get rowset metas, new_tablet_id={}, start={},
end={}, err={}",
+ new_tablet_id, 2, schema_change.alter_version(), err);
LOG(WARNING) << msg;
return;
}
@@ -1730,10 +1754,12 @@ void process_schema_change_job(MetaServiceCode& code,
std::string& msg, std::str
auto stats = response->mutable_stats();
TabletStats detached_stats;
if (is_versioned_read) {
- TxnErrorCode err = reader.get_tablet_load_stats(txn.get(), tablet_id,
stats, nullptr, true);
+ TxnErrorCode err =
+ reader.get_tablet_merged_stats(txn.get(), new_tablet_id,
stats, nullptr, true);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
- msg = fmt::format("failed to get tablet stats, tablet_id={},
err={}", tablet_id, err);
+ msg = fmt::format("failed to get tablet stats, tablet_id={},
err={}", new_tablet_id,
+ err);
LOG(WARNING) << msg;
return;
}
@@ -1754,48 +1780,56 @@ void process_schema_change_job(MetaServiceCode& code,
std::string& msg, std::str
}
if (is_versioned_write) {
// read new TabletLoadStatsKey -> TabletStatsPB
- TabletStatsPB new_tablet_load_stats;
+ TabletStatsPB new_tablet_stats;
MetaReader meta_reader(instance_id, txn_kv);
Versionstamp* versionstamp = nullptr;
TxnErrorCode err = TxnErrorCode::TXN_OK;
if (is_versioned_read) {
- new_tablet_load_stats.CopyFrom(*stats);
+ new_tablet_stats.CopyFrom(*stats);
} else {
- err = meta_reader.get_tablet_load_stats(txn.get(), new_tablet_id,
- &new_tablet_load_stats,
versionstamp, false);
+ err = meta_reader.get_tablet_merged_stats(txn.get(),
new_tablet_id, &new_tablet_stats,
+ versionstamp, false);
}
if (err == TxnErrorCode::TXN_OK) {
// new_tablet_load_stats exists, update TabletStatsPB
- schema_change_update_tablet_stats(
- schema_change, &new_tablet_load_stats, num_remove_rows,
size_remove_rowsets,
- num_remove_rowsets, num_remove_segments,
index_size_remove_rowsets,
- segment_size_remove_rowsets);
+ schema_change_update_tablet_stats(schema_change,
&new_tablet_stats, num_remove_rows,
+ size_remove_rowsets,
num_remove_rowsets,
+ num_remove_segments,
index_size_remove_rowsets,
+ segment_size_remove_rowsets);
} else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
// First time switching from single write to double write mode
// Step 1: Copy from single version stats as baseline
- new_tablet_load_stats.CopyFrom(*stats);
+ new_tablet_stats.CopyFrom(*stats);
// Step 2: Apply schema change updates
- schema_change_update_tablet_stats(
- schema_change, &new_tablet_load_stats, num_remove_rows,
size_remove_rowsets,
- num_remove_rowsets, num_remove_segments,
index_size_remove_rowsets,
- segment_size_remove_rowsets);
+ schema_change_update_tablet_stats(schema_change,
&new_tablet_stats, num_remove_rows,
+ size_remove_rowsets,
num_remove_rowsets,
+ num_remove_segments,
index_size_remove_rowsets,
+ segment_size_remove_rowsets);
} else if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet compact stats,
tablet_id={}, err={}", tablet_id,
err);
return;
}
- // Write new TabletLoadStatsKey -> TabletStatsPB for versioned storage
- auto new_tablet_load_stats_val =
new_tablet_load_stats.SerializeAsString();
- std::string new_tablet_load_stats_version_key =
- versioned::tablet_load_stats_key({instance_id, new_tablet_id});
- LOG_INFO("put versioned tablet compact stats key")
- .tag("new_tablet_load_stats_version_key",
hex(new_tablet_load_stats_version_key))
+
+ auto [load_stats, compact_stats] =
+
split_tablet_stats_into_load_and_compact_parts(new_tablet_stats);
+ std::string load_value = load_stats.SerializeAsString();
+ std::string compact_value = compact_stats.SerializeAsString();
+ std::string load_stats_key =
versioned::tablet_load_stats_key({instance_id, new_tablet_id});
+ std::string compact_stats_key =
+ versioned::tablet_compact_stats_key({instance_id,
new_tablet_id});
+
+ LOG_INFO("put versioned tablet load/compact stats key")
.tag("tablet_id", tablet_id)
.tag("new_tablet_id", new_tablet_id)
- .tag("value_size", new_tablet_load_stats_val.size())
+ .tag("load_value_size", load_value.size())
+ .tag("compact_value_size", compact_value.size())
+ .tag("load_stats_key", hex(load_stats_key))
+ .tag("compact_stats_key", hex(compact_stats_key))
.tag("instance_id", instance_id);
- versioned_put(txn.get(), new_tablet_load_stats_version_key,
new_tablet_load_stats_val);
+ versioned_put(txn.get(), load_stats_key, load_value);
+ versioned_put(txn.get(), compact_stats_key, compact_value);
}
schema_change_update_tablet_stats(schema_change, stats, num_remove_rows,
size_remove_rowsets,
num_remove_rowsets, num_remove_segments,
diff --git a/cloud/test/meta_service_job_test.cpp
b/cloud/test/meta_service_job_test.cpp
index 8d7d805aa66..e53ed77fb35 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -1425,7 +1425,9 @@ TEST(MetaServiceJobVersionedReadTest,
SchemaChangeJobTest) {
{tablet_id, new_tablet_id});
}
- // Create output rowsets for new tablet
+ // Now old table has rowsets [1, 2, 3, 4, 5], and new tablet has [4, 5]
+
+ // Create output rowsets for new tablet [1, 2, 3]
std::vector<doris::RowsetMetaCloudPB> output_rowsets;
for (int64_t i = 0; i < 3; ++i) {
auto rowset = create_rowset(new_tablet_id, i + 2, i + 2, 100);
@@ -1471,6 +1473,8 @@ TEST(MetaServiceJobVersionedReadTest,
SchemaChangeJobTest) {
schema_change->set_size_output_rowsets(300 * 110);
schema_change->set_index_size_output_rowsets(300 * 10);
schema_change->set_segment_size_output_rowsets(300 * 110);
+ schema_change->set_output_cumulative_point(
+ 4); // cumulative point from the old table to the new one.
brpc::Controller cntl;
meta_service->finish_tablet_job(&cntl, &req, &res, nullptr);
@@ -1496,6 +1500,8 @@ TEST(MetaServiceJobVersionedReadTest,
SchemaChangeJobTest) {
EXPECT_EQ(new_stats.segment_size(),
new_tablet_stats_pb.segment_size() +
req.job().schema_change().segment_size_output_rowsets());
+ EXPECT_EQ(new_stats.cumulative_point(),
+ req.job().schema_change().output_cumulative_point());
}
{
@@ -1524,6 +1530,7 @@ TEST(MetaServiceJobVersionedReadTest,
SchemaChangeJobTest) {
ASSERT_EQ(resp.rowset_meta(1).end_version(), 6);
}
+ new_tablet_stats_pb = get_tablet_stats(new_tablet_id);
{
// Get the rowset metas of the new tablet
GetRowsetRequest req;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]