This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 63937237e33 [fix](cloud) Schema change job handles input rowsets in
range [2, alter_version] (#55461)
63937237e33 is described below
commit 63937237e333d021198aecc23af857e43f56df63
Author: walter <[email protected]>
AuthorDate: Sun Aug 31 11:57:55 2025 +0800
[fix](cloud) Schema change job handles input rowsets in range [2,
alter_version] (#55461)
---
cloud/src/meta-service/meta_service_job.cpp | 6 +--
cloud/src/meta-store/meta_reader.h | 2 +-
cloud/test/meta_service_job_test.cpp | 69 +++++++++++++++++++++++++++--
cloud/test/meta_service_test.cpp | 16 +++++++
4 files changed, 86 insertions(+), 7 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_job.cpp
b/cloud/src/meta-service/meta_service_job.cpp
index 53c9361a633..654a032b6a1 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -1707,12 +1707,12 @@ void process_schema_change_job(MetaServiceCode& code,
std::string& msg, std::str
}
} else {
std::vector<RowsetMetaCloudPB> rowset_metas;
- TxnErrorCode err = reader.get_rowset_metas(
- txn.get(), tablet_id, 2, schema_change.alter_version() + 1,
&rowset_metas);
+ TxnErrorCode err = reader.get_rowset_metas(txn.get(), tablet_id, 2,
+
schema_change.alter_version(), &rowset_metas);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get rowset metas, tablet_id={},
start={}, end={}, err={}",
- tablet_id, 2, schema_change.alter_version() + 1,
err);
+ tablet_id, 2, schema_change.alter_version(),
err);
LOG(WARNING) << msg;
return;
}
diff --git a/cloud/src/meta-store/meta_reader.h
b/cloud/src/meta-store/meta_reader.h
index d0d630e0893..121dafda051 100644
--- a/cloud/src/meta-store/meta_reader.h
+++ b/cloud/src/meta-store/meta_reader.h
@@ -152,7 +152,7 @@ public:
std::unordered_map<int64_t,
TabletIndexPB>* tablet_indexes,
bool snapshot = false);
- // Get the rowset meta for the given tablet_id and version range.
+ // Get the rowset meta for the given tablet_id and version range
[start_version, end_version].
//
// The `rowset_metas` will be filled with the RowsetMetaCloudPB for each
version in the range,
// in ascending order.
diff --git a/cloud/test/meta_service_job_test.cpp
b/cloud/test/meta_service_job_test.cpp
index debef00799a..8d7d805aa66 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -51,6 +51,8 @@ extern doris::RowsetMetaCloudPB create_rowset(int64_t txn_id,
int64_t tablet_id,
int64_t version, int num_rows);
extern void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const
std::string& label,
int64_t table_id, int64_t partition_id, int64_t
tablet_id);
+extern void insert_rowsets(MetaServiceProxy* meta_service, int64_t db_id,
const std::string& label,
+ int64_t table_id, int64_t partition_id,
std::vector<int64_t> tablet_ids);
extern void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t
index_id,
int64_t partition_id, int64_t tablet_id);
extern void get_tablet_stats(MetaServiceProxy* meta_service, int64_t table_id,
int64_t index_id,
@@ -1368,7 +1370,6 @@ TEST(MetaServiceJobVersionedReadTest,
SchemaChangeJobTest) {
insert_rowset(meta_service.get(), 1, "commit_rowset_1", table_id,
partition_id, tablet_id);
insert_rowset(meta_service.get(), 1, "commit_rowset_2", table_id,
partition_id, tablet_id);
insert_rowset(meta_service.get(), 1, "commit_rowset_3", table_id,
partition_id, tablet_id);
- insert_rowset(meta_service.get(), 1, "commit_rowset_4", table_id,
partition_id, tablet_id);
}
auto get_tablet_stats = [&](int64_t tid) -> TabletStatsPB {
@@ -1416,6 +1417,14 @@ TEST(MetaServiceJobVersionedReadTest,
SchemaChangeJobTest) {
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
+ {
+ // Add more rowsets, to ensure the alter version is accurate.
+ insert_rowsets(meta_service.get(), 1, "commit_rowset_4", table_id,
partition_id,
+ {tablet_id, new_tablet_id});
+ insert_rowsets(meta_service.get(), 1, "commit_rowset_5", table_id,
partition_id,
+ {tablet_id, new_tablet_id});
+ }
+
// Create output rowsets for new tablet
std::vector<doris::RowsetMetaCloudPB> output_rowsets;
for (int64_t i = 0; i < 3; ++i) {
@@ -1430,6 +1439,7 @@ TEST(MetaServiceJobVersionedReadTest,
SchemaChangeJobTest) {
auto old_tablet_stats_pb = get_tablet_stats(tablet_id);
auto new_tablet_stats_pb = get_tablet_stats(new_tablet_id);
+ int64_t alter_version = output_rowsets.back().end_version();
{
// Finish schema change job
FinishTabletJobRequest req;
@@ -1448,7 +1458,7 @@ TEST(MetaServiceJobVersionedReadTest,
SchemaChangeJobTest) {
schema_change->mutable_new_tablet_idx()->set_table_id(table_id);
schema_change->mutable_new_tablet_idx()->set_index_id(index_id);
schema_change->mutable_new_tablet_idx()->set_partition_id(partition_id);
- schema_change->set_alter_version(output_rowsets.back().end_version());
+ schema_change->set_alter_version(alter_version);
// Set output rowsets info
for (const auto& rowset : output_rowsets) {
@@ -1470,7 +1480,9 @@ TEST(MetaServiceJobVersionedReadTest,
SchemaChangeJobTest) {
auto new_stats = get_tablet_stats(new_tablet_id);
EXPECT_EQ(new_stats.num_rows(),
- new_tablet_stats_pb.num_rows() +
req.job().schema_change().num_output_rows());
+ new_tablet_stats_pb.num_rows() +
req.job().schema_change().num_output_rows())
+ << "new_stats => " << new_stats.DebugString() <<
"\nnew_tablet_stats => "
+ << new_tablet_stats_pb.DebugString();
EXPECT_EQ(new_stats.data_size(), new_tablet_stats_pb.data_size() +
req.job().schema_change().size_output_rowsets());
EXPECT_EQ(new_stats.num_rowsets(), new_tablet_stats_pb.num_rowsets() +
@@ -1485,6 +1497,57 @@ TEST(MetaServiceJobVersionedReadTest,
SchemaChangeJobTest) {
new_tablet_stats_pb.segment_size() +
req.job().schema_change().segment_size_output_rowsets());
}
+
+ {
+ // Get the rowset metas of the old tablet
+ GetRowsetRequest req;
+ GetRowsetResponse resp;
+ req.set_cloud_unique_id(cloud_unique_id);
+ req.set_start_version(alter_version + 1);
+ req.set_end_version(-1);
+ req.mutable_idx()->set_db_id(1);
+ req.mutable_idx()->set_table_id(table_id);
+ req.mutable_idx()->set_index_id(index_id);
+ req.mutable_idx()->set_partition_id(partition_id);
+ req.mutable_idx()->set_tablet_id(tablet_id);
+ req.set_base_compaction_cnt(old_tablet_stats_pb.base_compaction_cnt());
+ req.set_cumulative_point(old_tablet_stats_pb.cumulative_point());
+
req.set_cumulative_compaction_cnt(old_tablet_stats_pb.cumulative_compaction_cnt());
+
+ brpc::Controller cntl;
+ meta_service->get_rowset(&cntl, &req, &resp, nullptr);
+ ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(resp.rowset_meta_size(), 2);
+ ASSERT_EQ(resp.rowset_meta(0).start_version(), 5);
+ ASSERT_EQ(resp.rowset_meta(0).end_version(), 5);
+ ASSERT_EQ(resp.rowset_meta(1).start_version(), 6);
+ ASSERT_EQ(resp.rowset_meta(1).end_version(), 6);
+ }
+
+ {
+ // Get the rowset metas of the new tablet
+ GetRowsetRequest req;
+ GetRowsetResponse resp;
+ req.set_cloud_unique_id(cloud_unique_id);
+ req.set_start_version(alter_version + 1);
+ req.set_end_version(-1);
+ req.mutable_idx()->set_db_id(1);
+ req.mutable_idx()->set_table_id(table_id);
+ req.mutable_idx()->set_index_id(index_id);
+ req.mutable_idx()->set_partition_id(partition_id);
+ req.mutable_idx()->set_tablet_id(new_tablet_id);
+ req.set_base_compaction_cnt(new_tablet_stats_pb.base_compaction_cnt());
+ req.set_cumulative_point(new_tablet_stats_pb.cumulative_point());
+
req.set_cumulative_compaction_cnt(new_tablet_stats_pb.cumulative_compaction_cnt());
+ brpc::Controller cntl;
+ meta_service->get_rowset(&cntl, &req, &resp, nullptr);
+ ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(resp.rowset_meta_size(), 2);
+ ASSERT_EQ(resp.rowset_meta(0).start_version(), 5);
+ ASSERT_EQ(resp.rowset_meta(0).end_version(), 5);
+ ASSERT_EQ(resp.rowset_meta(1).start_version(), 6);
+ ASSERT_EQ(resp.rowset_meta(1).end_version(), 6);
+ }
}
void check_delete_bitmap_lock(MetaServiceProxy* meta_service, std::string
instance_id,
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index a2251fe9fde..69e0369c438 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -293,6 +293,22 @@ void insert_rowset(MetaServiceProxy* meta_service, int64_t
db_id, const std::str
commit_txn(meta_service, db_id, txn_id, label);
}
+void insert_rowsets(MetaServiceProxy* meta_service, int64_t db_id, const
std::string& label,
+ int64_t table_id, int64_t partition_id,
std::vector<int64_t> tablet_ids) {
+ int64_t txn_id = 0;
+ ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service, db_id, label, table_id,
txn_id));
+ for (auto tablet_id : tablet_ids) {
+ CreateRowsetResponse res;
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+ prepare_rowset(meta_service, rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+ res.Clear();
+ ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service, rowset, res));
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+ }
+ commit_txn(meta_service, db_id, txn_id, label);
+}
+
static void add_tablet_metas(MetaServiceProxy* meta_service, std::string
instance_id,
int64_t table_id, int64_t index_id,
const std::vector<std::array<int64_t, 2>>&
tablet_idxes,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]