This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 63937237e33 [fix](cloud) Schema change job handles input rowsets in 
range [2, alter_version] (#55461)
63937237e33 is described below

commit 63937237e333d021198aecc23af857e43f56df63
Author: walter <[email protected]>
AuthorDate: Sun Aug 31 11:57:55 2025 +0800

    [fix](cloud) Schema change job handles input rowsets in range [2, 
alter_version] (#55461)
---
 cloud/src/meta-service/meta_service_job.cpp |  6 +--
 cloud/src/meta-store/meta_reader.h          |  2 +-
 cloud/test/meta_service_job_test.cpp        | 69 +++++++++++++++++++++++++++--
 cloud/test/meta_service_test.cpp            | 16 +++++++
 4 files changed, 86 insertions(+), 7 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_job.cpp 
b/cloud/src/meta-service/meta_service_job.cpp
index 53c9361a633..654a032b6a1 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -1707,12 +1707,12 @@ void process_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::str
         }
     } else {
         std::vector<RowsetMetaCloudPB> rowset_metas;
-        TxnErrorCode err = reader.get_rowset_metas(
-                txn.get(), tablet_id, 2, schema_change.alter_version() + 1, 
&rowset_metas);
+        TxnErrorCode err = reader.get_rowset_metas(txn.get(), tablet_id, 2,
+                                                   
schema_change.alter_version(), &rowset_metas);
         if (err != TxnErrorCode::TXN_OK) {
             code = cast_as<ErrCategory::READ>(err);
             msg = fmt::format("failed to get rowset metas, tablet_id={}, 
start={}, end={}, err={}",
-                              tablet_id, 2, schema_change.alter_version() + 1, 
err);
+                              tablet_id, 2, schema_change.alter_version(), 
err);
             LOG(WARNING) << msg;
             return;
         }
diff --git a/cloud/src/meta-store/meta_reader.h 
b/cloud/src/meta-store/meta_reader.h
index d0d630e0893..121dafda051 100644
--- a/cloud/src/meta-store/meta_reader.h
+++ b/cloud/src/meta-store/meta_reader.h
@@ -152,7 +152,7 @@ public:
                                     std::unordered_map<int64_t, 
TabletIndexPB>* tablet_indexes,
                                     bool snapshot = false);
 
-    // Get the rowset meta for the given tablet_id and version range.
+    // Get the rowset meta for the given tablet_id and version range 
[start_version, end_version].
     //
     // The `rowset_metas` will be filled with the RowsetMetaCloudPB for each 
version in the range,
     // in ascending order.
diff --git a/cloud/test/meta_service_job_test.cpp 
b/cloud/test/meta_service_job_test.cpp
index debef00799a..8d7d805aa66 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -51,6 +51,8 @@ extern doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, 
int64_t tablet_id,
                                               int64_t version, int num_rows);
 extern void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const 
std::string& label,
                           int64_t table_id, int64_t partition_id, int64_t 
tablet_id);
+extern void insert_rowsets(MetaServiceProxy* meta_service, int64_t db_id, 
const std::string& label,
+                           int64_t table_id, int64_t partition_id, 
std::vector<int64_t> tablet_ids);
 extern void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t 
index_id,
                        int64_t partition_id, int64_t tablet_id);
 extern void get_tablet_stats(MetaServiceProxy* meta_service, int64_t table_id, 
int64_t index_id,
@@ -1368,7 +1370,6 @@ TEST(MetaServiceJobVersionedReadTest, 
SchemaChangeJobTest) {
         insert_rowset(meta_service.get(), 1, "commit_rowset_1", table_id, 
partition_id, tablet_id);
         insert_rowset(meta_service.get(), 1, "commit_rowset_2", table_id, 
partition_id, tablet_id);
         insert_rowset(meta_service.get(), 1, "commit_rowset_3", table_id, 
partition_id, tablet_id);
-        insert_rowset(meta_service.get(), 1, "commit_rowset_4", table_id, 
partition_id, tablet_id);
     }
 
     auto get_tablet_stats = [&](int64_t tid) -> TabletStatsPB {
@@ -1416,6 +1417,14 @@ TEST(MetaServiceJobVersionedReadTest, 
SchemaChangeJobTest) {
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
     }
 
+    {
+        // Add more rowsets, to ensure the alter version is accurate.
+        insert_rowsets(meta_service.get(), 1, "commit_rowset_4", table_id, 
partition_id,
+                       {tablet_id, new_tablet_id});
+        insert_rowsets(meta_service.get(), 1, "commit_rowset_5", table_id, 
partition_id,
+                       {tablet_id, new_tablet_id});
+    }
+
     // Create output rowsets for new tablet
     std::vector<doris::RowsetMetaCloudPB> output_rowsets;
     for (int64_t i = 0; i < 3; ++i) {
@@ -1430,6 +1439,7 @@ TEST(MetaServiceJobVersionedReadTest, 
SchemaChangeJobTest) {
     auto old_tablet_stats_pb = get_tablet_stats(tablet_id);
     auto new_tablet_stats_pb = get_tablet_stats(new_tablet_id);
 
+    int64_t alter_version = output_rowsets.back().end_version();
     {
         // Finish schema change job
         FinishTabletJobRequest req;
@@ -1448,7 +1458,7 @@ TEST(MetaServiceJobVersionedReadTest, 
SchemaChangeJobTest) {
         schema_change->mutable_new_tablet_idx()->set_table_id(table_id);
         schema_change->mutable_new_tablet_idx()->set_index_id(index_id);
         
schema_change->mutable_new_tablet_idx()->set_partition_id(partition_id);
-        schema_change->set_alter_version(output_rowsets.back().end_version());
+        schema_change->set_alter_version(alter_version);
 
         // Set output rowsets info
         for (const auto& rowset : output_rowsets) {
@@ -1470,7 +1480,9 @@ TEST(MetaServiceJobVersionedReadTest, 
SchemaChangeJobTest) {
         auto new_stats = get_tablet_stats(new_tablet_id);
 
         EXPECT_EQ(new_stats.num_rows(),
-                  new_tablet_stats_pb.num_rows() + 
req.job().schema_change().num_output_rows());
+                  new_tablet_stats_pb.num_rows() + 
req.job().schema_change().num_output_rows())
+                << "new_stats => " << new_stats.DebugString() << 
"\nnew_tablet_stats => "
+                << new_tablet_stats_pb.DebugString();
         EXPECT_EQ(new_stats.data_size(), new_tablet_stats_pb.data_size() +
                                                  
req.job().schema_change().size_output_rowsets());
         EXPECT_EQ(new_stats.num_rowsets(), new_tablet_stats_pb.num_rowsets() +
@@ -1485,6 +1497,57 @@ TEST(MetaServiceJobVersionedReadTest, 
SchemaChangeJobTest) {
                   new_tablet_stats_pb.segment_size() +
                           
req.job().schema_change().segment_size_output_rowsets());
     }
+
+    {
+        // Get the rowset metas of the old tablet
+        GetRowsetRequest req;
+        GetRowsetResponse resp;
+        req.set_cloud_unique_id(cloud_unique_id);
+        req.set_start_version(alter_version + 1);
+        req.set_end_version(-1);
+        req.mutable_idx()->set_db_id(1);
+        req.mutable_idx()->set_table_id(table_id);
+        req.mutable_idx()->set_index_id(index_id);
+        req.mutable_idx()->set_partition_id(partition_id);
+        req.mutable_idx()->set_tablet_id(tablet_id);
+        req.set_base_compaction_cnt(old_tablet_stats_pb.base_compaction_cnt());
+        req.set_cumulative_point(old_tablet_stats_pb.cumulative_point());
+        
req.set_cumulative_compaction_cnt(old_tablet_stats_pb.cumulative_compaction_cnt());
+
+        brpc::Controller cntl;
+        meta_service->get_rowset(&cntl, &req, &resp, nullptr);
+        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
+        ASSERT_EQ(resp.rowset_meta_size(), 2);
+        ASSERT_EQ(resp.rowset_meta(0).start_version(), 5);
+        ASSERT_EQ(resp.rowset_meta(0).end_version(), 5);
+        ASSERT_EQ(resp.rowset_meta(1).start_version(), 6);
+        ASSERT_EQ(resp.rowset_meta(1).end_version(), 6);
+    }
+
+    {
+        // Get the rowset metas of the new tablet
+        GetRowsetRequest req;
+        GetRowsetResponse resp;
+        req.set_cloud_unique_id(cloud_unique_id);
+        req.set_start_version(alter_version + 1);
+        req.set_end_version(-1);
+        req.mutable_idx()->set_db_id(1);
+        req.mutable_idx()->set_table_id(table_id);
+        req.mutable_idx()->set_index_id(index_id);
+        req.mutable_idx()->set_partition_id(partition_id);
+        req.mutable_idx()->set_tablet_id(new_tablet_id);
+        req.set_base_compaction_cnt(new_tablet_stats_pb.base_compaction_cnt());
+        req.set_cumulative_point(new_tablet_stats_pb.cumulative_point());
+        
req.set_cumulative_compaction_cnt(new_tablet_stats_pb.cumulative_compaction_cnt());
+        brpc::Controller cntl;
+        meta_service->get_rowset(&cntl, &req, &resp, nullptr);
+        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
+        ASSERT_EQ(resp.rowset_meta_size(), 2);
+        ASSERT_EQ(resp.rowset_meta(0).start_version(), 5);
+        ASSERT_EQ(resp.rowset_meta(0).end_version(), 5);
+        ASSERT_EQ(resp.rowset_meta(1).start_version(), 6);
+        ASSERT_EQ(resp.rowset_meta(1).end_version(), 6);
+    }
 }
 
 void check_delete_bitmap_lock(MetaServiceProxy* meta_service, std::string 
instance_id,
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index a2251fe9fde..69e0369c438 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -293,6 +293,22 @@ void insert_rowset(MetaServiceProxy* meta_service, int64_t 
db_id, const std::str
     commit_txn(meta_service, db_id, txn_id, label);
 }
 
+void insert_rowsets(MetaServiceProxy* meta_service, int64_t db_id, const 
std::string& label,
+                    int64_t table_id, int64_t partition_id, 
std::vector<int64_t> tablet_ids) {
+    int64_t txn_id = 0;
+    ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service, db_id, label, table_id, 
txn_id));
+    for (auto tablet_id : tablet_ids) {
+        CreateRowsetResponse res;
+        auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+        prepare_rowset(meta_service, rowset, res);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+        res.Clear();
+        ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service, rowset, res));
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+    }
+    commit_txn(meta_service, db_id, txn_id, label);
+}
+
 static void add_tablet_metas(MetaServiceProxy* meta_service, std::string 
instance_id,
                              int64_t table_id, int64_t index_id,
                              const std::vector<std::array<int64_t, 2>>& 
tablet_idxes,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to