This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e72f6e3e940 [feat](cloud) process_schema_change_job supports versioned
read and save min read version (#55313)
e72f6e3e940 is described below
commit e72f6e3e940323cfb0c7a6165502bf596a9253d2
Author: abmdocrt <[email protected]>
AuthorDate: Wed Aug 27 09:03:36 2025 +0800
[feat](cloud) process_schema_change_job supports versioned read and save
min read version (#55313)
---
cloud/src/meta-service/meta_service_job.cpp | 170 ++++++++++++++++++----------
cloud/test/meta_service_job_test.cpp | 140 +++++++++++++++++++++++
2 files changed, 250 insertions(+), 60 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_job.cpp
b/cloud/src/meta-service/meta_service_job.cpp
index 12c45a6759f..53c9361a633 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -17,6 +17,7 @@
#include <brpc/closure_guard.h>
#include <brpc/controller.h>
+#include <fmt/core.h>
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>
@@ -1339,6 +1340,41 @@ void schema_change_update_tablet_stats(const
TabletSchemaChangeJobPB& schema_cha
segment_size_remove_rowsets));
}
+std::pair<MetaServiceCode, std::string> scan_schema_change_input_rowsets(
+ Transaction* txn, std::string_view instance_id, int64_t new_tablet_id,
+ std::string& rs_start, std::string& rs_end, auto&& callback) {
+ std::unique_ptr<RangeGetIterator> it;
+ auto rs_start1 = rs_start;
+ do {
+ TxnErrorCode err = txn->get(rs_start1, rs_end, &it);
+ if (err != TxnErrorCode::TXN_OK) {
+ return {MetaServiceCode::KV_TXN_GET_ERR,
+ fmt::format(
+ "internal error, failed to get rowset range,
err={} new_tablet_id={} "
+ "range=[{}, {})",
+ err, new_tablet_id, hex(rs_start), hex(rs_end))};
+ }
+
+ while (it->has_next()) {
+ auto [k, v] = it->next();
+
+ doris::RowsetMetaCloudPB rs;
+ if (!rs.ParseFromArray(v.data(), v.size())) {
+ return {MetaServiceCode::PROTOBUF_PARSE_ERR,
+ fmt::format("malformed rowset meta, unable to
deserialize, "
+ "new_tablet_id={} key={}",
+ new_tablet_id, hex(k))};
+ }
+
+ callback(std::move(rs));
+
+ if (!it->has_next()) rs_start1 = k;
+ }
+ rs_start1.push_back('\x00'); // Update to next smallest key for
iteration
+ } while (it->more());
+ return {MetaServiceCode::OK, ""};
+}
+
void process_schema_change_job(MetaServiceCode& code, std::string& msg,
std::stringstream& ss,
std::unique_ptr<Transaction>& txn,
const FinishTabletJobRequest* request,
@@ -1638,56 +1674,52 @@ void process_schema_change_job(MetaServiceCode& code,
std::string& msg, std::str
auto rs_start = meta_rowset_key({instance_id, new_tablet_id, 2});
auto rs_end = meta_rowset_key({instance_id, new_tablet_id,
schema_change.alter_version() + 1});
- std::unique_ptr<RangeGetIterator> it;
- auto rs_start1 = rs_start;
- do {
- TxnErrorCode err = txn->get(rs_start1, rs_end, &it);
+ auto handle_schema_change_input_rowset_meta = [&](doris::RowsetMetaCloudPB
rs) {
+ num_remove_rows += rs.num_rows();
+ size_remove_rowsets += rs.total_disk_size();
+ ++num_remove_rowsets;
+ num_remove_segments += rs.num_segments();
+ index_size_remove_rowsets += rs.index_disk_size();
+ segment_size_remove_rowsets += rs.data_disk_size();
+
+ auto recycle_key = recycle_rowset_key({instance_id, new_tablet_id,
rs.rowset_id_v2()});
+ RecycleRowsetPB recycle_rowset;
+ recycle_rowset.set_creation_time(now);
+ recycle_rowset.mutable_rowset_meta()->CopyFrom(rs);
+ recycle_rowset.set_type(RecycleRowsetPB::DROP);
+ if (is_versioned_write) {
+ schema_change_log.add_recycle_rowsets()->Swap(&recycle_rowset);
+ } else {
+ auto recycle_val = recycle_rowset.SerializeAsString();
+ txn->put(recycle_key, recycle_val);
+ }
+ INSTANCE_LOG(INFO) << "put recycle rowset, new_tablet_id=" <<
new_tablet_id
+ << " key=" << hex(recycle_key);
+ };
+
+ if (!is_versioned_read) {
+ std::tie(code, msg) =
+ scan_schema_change_input_rowsets(txn.get(), instance_id,
new_tablet_id, rs_start,
+ rs_end,
handle_schema_change_input_rowset_meta);
+ if (code != MetaServiceCode::OK) {
+ LOG(WARNING) << msg;
+ return;
+ }
+ } else {
+ std::vector<RowsetMetaCloudPB> rowset_metas;
+ TxnErrorCode err = reader.get_rowset_metas(
+ txn.get(), tablet_id, 2, schema_change.alter_version() + 1,
&rowset_metas);
if (err != TxnErrorCode::TXN_OK) {
- code = MetaServiceCode::KV_TXN_GET_ERR;
- SS << "internal error, failed to get rowset range, err=" << err
- << " tablet_id=" << new_tablet_id << " range=[" <<
hex(rs_start1) << ", << "
- << hex(rs_end) << ")";
- msg = ss.str();
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get rowset metas, tablet_id={},
start={}, end={}, err={}",
+ tablet_id, 2, schema_change.alter_version() + 1,
err);
+ LOG(WARNING) << msg;
return;
}
-
- while (it->has_next()) {
- auto [k, v] = it->next();
-
- doris::RowsetMetaCloudPB rs;
- if (!rs.ParseFromArray(v.data(), v.size())) {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- SS << "malformed rowset meta, unable to deserialize,
tablet_id=" << new_tablet_id
- << " key=" << hex(k);
- msg = ss.str();
- return;
- }
-
- num_remove_rows += rs.num_rows();
- size_remove_rowsets += rs.total_disk_size();
- ++num_remove_rowsets;
- num_remove_segments += rs.num_segments();
- index_size_remove_rowsets += rs.index_disk_size();
- segment_size_remove_rowsets += rs.data_disk_size();
-
- auto recycle_key = recycle_rowset_key({instance_id, new_tablet_id,
rs.rowset_id_v2()});
- RecycleRowsetPB recycle_rowset;
- recycle_rowset.set_creation_time(now);
- recycle_rowset.mutable_rowset_meta()->CopyFrom(rs);
- recycle_rowset.set_type(RecycleRowsetPB::DROP);
- if (is_versioned_write) {
- schema_change_log.add_recycle_rowsets()->Swap(&recycle_rowset);
- } else {
- auto recycle_val = recycle_rowset.SerializeAsString();
- txn->put(recycle_key, recycle_val);
- }
- INSTANCE_LOG(INFO) << "put recycle rowset, new_tablet_id=" <<
new_tablet_id
- << " key=" << hex(recycle_key);
-
- if (!it->has_next()) rs_start1 = k;
+ for (auto&& rowset_meta : rowset_metas) {
+ handle_schema_change_input_rowset_meta(std::move(rowset_meta));
}
- rs_start1.push_back('\x00'); // Update to next smallest key for
iteration
- } while (it->more());
+ }
txn->remove(rs_start, rs_end);
@@ -1696,26 +1728,41 @@ void process_schema_change_job(MetaServiceCode& code,
std::string& msg, std::str
//==========================================================================
auto stats = response->mutable_stats();
TabletStats detached_stats;
- // ATTN: The condition that snapshot read can be used to get tablet stats
is: all other transactions that put tablet stats
- // can make read write conflicts with this transaction on other keys.
Currently, if all meta-service nodes are running
- // with `config::split_tablet_stats = true` can meet the condition.
- internal_get_tablet_stats(code, msg, txn.get(), instance_id,
new_tablet_idx, *stats,
- detached_stats,
config::snapshot_get_tablet_stats);
- if (code != MetaServiceCode::OK) {
- LOG_WARNING("failed to get tablet stats")
- .tag("instance_id", instance_id)
- .tag("tablet_id", tablet_id)
- .tag("code", code)
- .tag("msg", msg);
- return;
+ if (is_versioned_read) {
+ TxnErrorCode err = reader.get_tablet_load_stats(txn.get(), tablet_id,
stats, nullptr, true);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get tablet stats, tablet_id={},
err={}", tablet_id, err);
+ LOG(WARNING) << msg;
+ return;
+ }
+ } else {
+ // ATTN: The condition that snapshot read can be used to get tablet
stats is: all other transactions that put tablet stats
+ // can make read write conflicts with this transaction on other keys.
Currently, if all meta-service nodes are running
+ // with `config::split_tablet_stats = true` can meet the condition.
+ internal_get_tablet_stats(code, msg, txn.get(), instance_id,
new_tablet_idx, *stats,
+ detached_stats,
config::snapshot_get_tablet_stats);
+ if (code != MetaServiceCode::OK) {
+ LOG_WARNING("failed to get tablet stats")
+ .tag("instance_id", instance_id)
+ .tag("tablet_id", tablet_id)
+ .tag("code", code)
+ .tag("msg", msg);
+ return;
+ }
}
if (is_versioned_write) {
// read new TabletLoadStatsKey -> TabletStatsPB
TabletStatsPB new_tablet_load_stats;
MetaReader meta_reader(instance_id, txn_kv);
Versionstamp* versionstamp = nullptr;
- TxnErrorCode err = meta_reader.get_tablet_load_stats(
- txn.get(), new_tablet_id, &new_tablet_load_stats,
versionstamp, false);
+ TxnErrorCode err = TxnErrorCode::TXN_OK;
+ if (is_versioned_read) {
+ new_tablet_load_stats.CopyFrom(*stats);
+ } else {
+ err = meta_reader.get_tablet_load_stats(txn.get(), new_tablet_id,
+ &new_tablet_load_stats,
versionstamp, false);
+ }
if (err == TxnErrorCode::TXN_OK) {
// new_tablet_load_stats exists, update TabletStatsPB
schema_change_update_tablet_stats(
@@ -1851,6 +1898,9 @@ void process_schema_change_job(MetaServiceCode& code,
std::string& msg, std::str
std::string operation_log_key = versioned::log_key({instance_id});
std::string operation_log_value;
OperationLogPB operation_log;
+ if (is_versioned_read) {
+ operation_log.set_min_timestamp(reader.min_read_version());
+ }
operation_log.mutable_schema_change()->Swap(&schema_change_log);
if (!operation_log.SerializeToString(&operation_log_value)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
diff --git a/cloud/test/meta_service_job_test.cpp
b/cloud/test/meta_service_job_test.cpp
index b8b92364351..debef00799a 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -1347,6 +1347,146 @@ TEST(MetaServiceJobVersionedReadTest,
CompactionJobTest) {
}
}
+TEST(MetaServiceJobVersionedReadTest, SchemaChangeJobTest) {
+ auto meta_service = get_meta_service(false);
+ std::string instance_id = "test_cloud_instance_id";
+ std::string cloud_unique_id = "1:test_cloud_unique_id:1";
+ MOCK_GET_INSTANCE_ID(instance_id);
+ create_and_refresh_instance(meta_service.get(), instance_id);
+
+ int64_t table_id = 1, index_id = 2, partition_id = 3, tablet_id = 4;
+ int64_t new_tablet_id = 14;
+ {
+ // Create tablets
+ create_tablet(meta_service.get(), table_id, index_id, partition_id,
tablet_id, false);
+ create_tablet(meta_service.get(), table_id, index_id, partition_id,
new_tablet_id, false,
+ true);
+ }
+
+ {
+ // Create rowsets for old tablet
+ insert_rowset(meta_service.get(), 1, "commit_rowset_1", table_id,
partition_id, tablet_id);
+ insert_rowset(meta_service.get(), 1, "commit_rowset_2", table_id,
partition_id, tablet_id);
+ insert_rowset(meta_service.get(), 1, "commit_rowset_3", table_id,
partition_id, tablet_id);
+ insert_rowset(meta_service.get(), 1, "commit_rowset_4", table_id,
partition_id, tablet_id);
+ }
+
+ auto get_tablet_stats = [&](int64_t tid) -> TabletStatsPB {
+ GetTabletStatsRequest get_tablet_stats_req;
+ get_tablet_stats_req.set_cloud_unique_id(cloud_unique_id);
+ auto* tablet_idx = get_tablet_stats_req.add_tablet_idx();
+ tablet_idx->set_tablet_id(tid);
+ tablet_idx->set_db_id(1);
+ tablet_idx->set_index_id(index_id);
+ tablet_idx->set_partition_id(partition_id);
+ tablet_idx->set_table_id(table_id);
+ GetTabletStatsResponse get_tablet_stats_resp;
+ brpc::Controller cntl;
+ meta_service->get_tablet_stats(&cntl, &get_tablet_stats_req,
&get_tablet_stats_resp,
+ nullptr);
+ EXPECT_EQ(get_tablet_stats_resp.status().code(), MetaServiceCode::OK);
+ EXPECT_EQ(get_tablet_stats_resp.tablet_stats_size(), 1);
+ return get_tablet_stats_resp.tablet_stats(0);
+ };
+
+ std::string job_id = "schema_change_job_1";
+ int64_t txn_id = 123456;
+
+ {
+ // Start schema change job
+ StartTabletJobRequest req;
+ StartTabletJobResponse res;
+
+ req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
+ req.mutable_job()->mutable_idx()->set_table_id(table_id);
+ req.mutable_job()->mutable_idx()->set_index_id(index_id);
+ req.mutable_job()->mutable_idx()->set_partition_id(partition_id);
+
+ auto* schema_change = req.mutable_job()->mutable_schema_change();
+ schema_change->set_id(job_id);
+ schema_change->set_initiator("ip:port");
+ schema_change->mutable_new_tablet_idx()->set_tablet_id(new_tablet_id);
+ schema_change->mutable_new_tablet_idx()->set_table_id(table_id);
+ schema_change->mutable_new_tablet_idx()->set_index_id(index_id);
+
schema_change->mutable_new_tablet_idx()->set_partition_id(partition_id);
+ schema_change->set_expiration(time(nullptr) + 12);
+
+ brpc::Controller cntl;
+ meta_service->start_tablet_job(&cntl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ // Create output rowsets for new tablet
+ std::vector<doris::RowsetMetaCloudPB> output_rowsets;
+ for (int64_t i = 0; i < 3; ++i) {
+ auto rowset = create_rowset(new_tablet_id, i + 2, i + 2, 100);
+ rowset.set_txn_id(txn_id + i);
+ output_rowsets.push_back(rowset);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), output_rowsets.back(), res, txn_id +
i);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ auto old_tablet_stats_pb = get_tablet_stats(tablet_id);
+ auto new_tablet_stats_pb = get_tablet_stats(new_tablet_id);
+
+ {
+ // Finish schema change job
+ FinishTabletJobRequest req;
+ FinishTabletJobResponse res;
+
+ req.set_action(FinishTabletJobRequest::COMMIT);
+ req.mutable_job()->mutable_idx()->set_table_id(table_id);
+ req.mutable_job()->mutable_idx()->set_index_id(index_id);
+ req.mutable_job()->mutable_idx()->set_partition_id(partition_id);
+ req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
+
+ auto* schema_change = req.mutable_job()->mutable_schema_change();
+ schema_change->set_id(job_id);
+ schema_change->set_initiator("ip:port");
+ schema_change->mutable_new_tablet_idx()->set_tablet_id(new_tablet_id);
+ schema_change->mutable_new_tablet_idx()->set_table_id(table_id);
+ schema_change->mutable_new_tablet_idx()->set_index_id(index_id);
+
schema_change->mutable_new_tablet_idx()->set_partition_id(partition_id);
+ schema_change->set_alter_version(output_rowsets.back().end_version());
+
+ // Set output rowsets info
+ for (const auto& rowset : output_rowsets) {
+ schema_change->add_txn_ids(rowset.txn_id());
+ schema_change->add_output_versions(rowset.end_version());
+ }
+ schema_change->set_num_output_rows(300);
+ schema_change->set_num_output_rowsets(3);
+ schema_change->set_num_output_segments(3);
+ schema_change->set_size_output_rowsets(300 * 110);
+ schema_change->set_index_size_output_rowsets(300 * 10);
+ schema_change->set_segment_size_output_rowsets(300 * 110);
+
+ brpc::Controller cntl;
+ meta_service->finish_tablet_job(&cntl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+ // Verify tablet stats are updated correctly
+ auto new_stats = get_tablet_stats(new_tablet_id);
+
+ EXPECT_EQ(new_stats.num_rows(),
+ new_tablet_stats_pb.num_rows() +
req.job().schema_change().num_output_rows());
+ EXPECT_EQ(new_stats.data_size(), new_tablet_stats_pb.data_size() +
+
req.job().schema_change().size_output_rowsets());
+ EXPECT_EQ(new_stats.num_rowsets(), new_tablet_stats_pb.num_rowsets() +
+
req.job().schema_change().num_output_rowsets());
+ EXPECT_EQ(new_stats.num_segments(),
+ new_tablet_stats_pb.num_segments() +
+ req.job().schema_change().num_output_segments());
+ EXPECT_EQ(new_stats.index_size(),
+ new_tablet_stats_pb.index_size() +
+
req.job().schema_change().index_size_output_rowsets());
+ EXPECT_EQ(new_stats.segment_size(),
+ new_tablet_stats_pb.segment_size() +
+
req.job().schema_change().segment_size_output_rowsets());
+ }
+}
+
void check_delete_bitmap_lock(MetaServiceProxy* meta_service, std::string
instance_id,
int64_t table_id, int64_t lock_id, bool exist) {
std::unique_ptr<Transaction> txn;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]