This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e72f6e3e940 [feat](cloud) process_schema_change_job supports versioned 
read and save min read version (#55313)
e72f6e3e940 is described below

commit e72f6e3e940323cfb0c7a6165502bf596a9253d2
Author: abmdocrt <[email protected]>
AuthorDate: Wed Aug 27 09:03:36 2025 +0800

    [feat](cloud) process_schema_change_job supports versioned read and save 
min read version (#55313)
---
 cloud/src/meta-service/meta_service_job.cpp | 170 ++++++++++++++++++----------
 cloud/test/meta_service_job_test.cpp        | 140 +++++++++++++++++++++++
 2 files changed, 250 insertions(+), 60 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_job.cpp 
b/cloud/src/meta-service/meta_service_job.cpp
index 12c45a6759f..53c9361a633 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -17,6 +17,7 @@
 
 #include <brpc/closure_guard.h>
 #include <brpc/controller.h>
+#include <fmt/core.h>
 #include <gen_cpp/cloud.pb.h>
 #include <gen_cpp/olap_file.pb.h>
 #include <glog/logging.h>
@@ -1339,6 +1340,41 @@ void schema_change_update_tablet_stats(const 
TabletSchemaChangeJobPB& schema_cha
                                                      
segment_size_remove_rowsets));
 }
 
+std::pair<MetaServiceCode, std::string> scan_schema_change_input_rowsets(
+        Transaction* txn, std::string_view instance_id, int64_t new_tablet_id,
+        std::string& rs_start, std::string& rs_end, auto&& callback) {
+    std::unique_ptr<RangeGetIterator> it;
+    auto rs_start1 = rs_start;
+    do {
+        TxnErrorCode err = txn->get(rs_start1, rs_end, &it);
+        if (err != TxnErrorCode::TXN_OK) {
+            return {MetaServiceCode::KV_TXN_GET_ERR,
+                    fmt::format(
+                            "internal error, failed to get rowset range, 
err={} new_tablet_id={} "
+                            "range=[{}, {})",
+                            err, new_tablet_id, hex(rs_start), hex(rs_end))};
+        }
+
+        while (it->has_next()) {
+            auto [k, v] = it->next();
+
+            doris::RowsetMetaCloudPB rs;
+            if (!rs.ParseFromArray(v.data(), v.size())) {
+                return {MetaServiceCode::PROTOBUF_PARSE_ERR,
+                        fmt::format("malformed rowset meta, unable to 
deserialize, "
+                                    "new_tablet_id={} key={}",
+                                    new_tablet_id, hex(k))};
+            }
+
+            callback(std::move(rs));
+
+            if (!it->has_next()) rs_start1 = k;
+        }
+        rs_start1.push_back('\x00'); // Update to next smallest key for 
iteration
+    } while (it->more());
+    return {MetaServiceCode::OK, ""};
+}
+
 void process_schema_change_job(MetaServiceCode& code, std::string& msg, 
std::stringstream& ss,
                                std::unique_ptr<Transaction>& txn,
                                const FinishTabletJobRequest* request,
@@ -1638,56 +1674,52 @@ void process_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::str
 
     auto rs_start = meta_rowset_key({instance_id, new_tablet_id, 2});
     auto rs_end = meta_rowset_key({instance_id, new_tablet_id, 
schema_change.alter_version() + 1});
-    std::unique_ptr<RangeGetIterator> it;
-    auto rs_start1 = rs_start;
-    do {
-        TxnErrorCode err = txn->get(rs_start1, rs_end, &it);
+    auto handle_schema_change_input_rowset_meta = [&](doris::RowsetMetaCloudPB 
rs) {
+        num_remove_rows += rs.num_rows();
+        size_remove_rowsets += rs.total_disk_size();
+        ++num_remove_rowsets;
+        num_remove_segments += rs.num_segments();
+        index_size_remove_rowsets += rs.index_disk_size();
+        segment_size_remove_rowsets += rs.data_disk_size();
+
+        auto recycle_key = recycle_rowset_key({instance_id, new_tablet_id, 
rs.rowset_id_v2()});
+        RecycleRowsetPB recycle_rowset;
+        recycle_rowset.set_creation_time(now);
+        recycle_rowset.mutable_rowset_meta()->CopyFrom(rs);
+        recycle_rowset.set_type(RecycleRowsetPB::DROP);
+        if (is_versioned_write) {
+            schema_change_log.add_recycle_rowsets()->Swap(&recycle_rowset);
+        } else {
+            auto recycle_val = recycle_rowset.SerializeAsString();
+            txn->put(recycle_key, recycle_val);
+        }
+        INSTANCE_LOG(INFO) << "put recycle rowset, new_tablet_id=" << 
new_tablet_id
+                           << " key=" << hex(recycle_key);
+    };
+
+    if (!is_versioned_read) {
+        std::tie(code, msg) =
+                scan_schema_change_input_rowsets(txn.get(), instance_id, 
new_tablet_id, rs_start,
+                                                 rs_end, 
handle_schema_change_input_rowset_meta);
+        if (code != MetaServiceCode::OK) {
+            LOG(WARNING) << msg;
+            return;
+        }
+    } else {
+        std::vector<RowsetMetaCloudPB> rowset_metas;
+        TxnErrorCode err = reader.get_rowset_metas(
+                txn.get(), tablet_id, 2, schema_change.alter_version() + 1, 
&rowset_metas);
         if (err != TxnErrorCode::TXN_OK) {
-            code = MetaServiceCode::KV_TXN_GET_ERR;
-            SS << "internal error, failed to get rowset range, err=" << err
-               << " tablet_id=" << new_tablet_id << " range=[" << 
hex(rs_start1) << ", << "
-               << hex(rs_end) << ")";
-            msg = ss.str();
+            code = cast_as<ErrCategory::READ>(err);
+            msg = fmt::format("failed to get rowset metas, tablet_id={}, 
start={}, end={}, err={}",
+                              tablet_id, 2, schema_change.alter_version() + 1, 
err);
+            LOG(WARNING) << msg;
             return;
         }
-
-        while (it->has_next()) {
-            auto [k, v] = it->next();
-
-            doris::RowsetMetaCloudPB rs;
-            if (!rs.ParseFromArray(v.data(), v.size())) {
-                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                SS << "malformed rowset meta, unable to deserialize, 
tablet_id=" << new_tablet_id
-                   << " key=" << hex(k);
-                msg = ss.str();
-                return;
-            }
-
-            num_remove_rows += rs.num_rows();
-            size_remove_rowsets += rs.total_disk_size();
-            ++num_remove_rowsets;
-            num_remove_segments += rs.num_segments();
-            index_size_remove_rowsets += rs.index_disk_size();
-            segment_size_remove_rowsets += rs.data_disk_size();
-
-            auto recycle_key = recycle_rowset_key({instance_id, new_tablet_id, 
rs.rowset_id_v2()});
-            RecycleRowsetPB recycle_rowset;
-            recycle_rowset.set_creation_time(now);
-            recycle_rowset.mutable_rowset_meta()->CopyFrom(rs);
-            recycle_rowset.set_type(RecycleRowsetPB::DROP);
-            if (is_versioned_write) {
-                schema_change_log.add_recycle_rowsets()->Swap(&recycle_rowset);
-            } else {
-                auto recycle_val = recycle_rowset.SerializeAsString();
-                txn->put(recycle_key, recycle_val);
-            }
-            INSTANCE_LOG(INFO) << "put recycle rowset, new_tablet_id=" << 
new_tablet_id
-                               << " key=" << hex(recycle_key);
-
-            if (!it->has_next()) rs_start1 = k;
+        for (auto&& rowset_meta : rowset_metas) {
+            handle_schema_change_input_rowset_meta(std::move(rowset_meta));
         }
-        rs_start1.push_back('\x00'); // Update to next smallest key for 
iteration
-    } while (it->more());
+    }
 
     txn->remove(rs_start, rs_end);
 
@@ -1696,26 +1728,41 @@ void process_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::str
     
//==========================================================================
     auto stats = response->mutable_stats();
     TabletStats detached_stats;
-    // ATTN: The condition that snapshot read can be used to get tablet stats 
is: all other transactions that put tablet stats
-    //  can make read write conflicts with this transaction on other keys. 
Currently, if all meta-service nodes are running
-    //  with `config::split_tablet_stats = true` can meet the condition.
-    internal_get_tablet_stats(code, msg, txn.get(), instance_id, 
new_tablet_idx, *stats,
-                              detached_stats, 
config::snapshot_get_tablet_stats);
-    if (code != MetaServiceCode::OK) {
-        LOG_WARNING("failed to get tablet stats")
-                .tag("instance_id", instance_id)
-                .tag("tablet_id", tablet_id)
-                .tag("code", code)
-                .tag("msg", msg);
-        return;
+    if (is_versioned_read) {
+        TxnErrorCode err = reader.get_tablet_load_stats(txn.get(), tablet_id, 
stats, nullptr, true);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(err);
+            msg = fmt::format("failed to get tablet stats, tablet_id={}, 
err={}", tablet_id, err);
+            LOG(WARNING) << msg;
+            return;
+        }
+    } else {
+        // ATTN: The condition that snapshot read can be used to get tablet 
stats is: all other transactions that put tablet stats
+        //  can make read write conflicts with this transaction on other keys. 
Currently, if all meta-service nodes are running
+        //  with `config::split_tablet_stats = true` can meet the condition.
+        internal_get_tablet_stats(code, msg, txn.get(), instance_id, 
new_tablet_idx, *stats,
+                                  detached_stats, 
config::snapshot_get_tablet_stats);
+        if (code != MetaServiceCode::OK) {
+            LOG_WARNING("failed to get tablet stats")
+                    .tag("instance_id", instance_id)
+                    .tag("tablet_id", tablet_id)
+                    .tag("code", code)
+                    .tag("msg", msg);
+            return;
+        }
     }
     if (is_versioned_write) {
         // read new TabletLoadStatsKey -> TabletStatsPB
         TabletStatsPB new_tablet_load_stats;
         MetaReader meta_reader(instance_id, txn_kv);
         Versionstamp* versionstamp = nullptr;
-        TxnErrorCode err = meta_reader.get_tablet_load_stats(
-                txn.get(), new_tablet_id, &new_tablet_load_stats, 
versionstamp, false);
+        TxnErrorCode err = TxnErrorCode::TXN_OK;
+        if (is_versioned_read) {
+            new_tablet_load_stats.CopyFrom(*stats);
+        } else {
+            err = meta_reader.get_tablet_load_stats(txn.get(), new_tablet_id,
+                                                    &new_tablet_load_stats, 
versionstamp, false);
+        }
         if (err == TxnErrorCode::TXN_OK) {
             // new_tablet_load_stats exists, update TabletStatsPB
             schema_change_update_tablet_stats(
@@ -1851,6 +1898,9 @@ void process_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::str
         std::string operation_log_key = versioned::log_key({instance_id});
         std::string operation_log_value;
         OperationLogPB operation_log;
+        if (is_versioned_read) {
+            operation_log.set_min_timestamp(reader.min_read_version());
+        }
         operation_log.mutable_schema_change()->Swap(&schema_change_log);
         if (!operation_log.SerializeToString(&operation_log_value)) {
             code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
diff --git a/cloud/test/meta_service_job_test.cpp 
b/cloud/test/meta_service_job_test.cpp
index b8b92364351..debef00799a 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -1347,6 +1347,146 @@ TEST(MetaServiceJobVersionedReadTest, 
CompactionJobTest) {
     }
 }
 
+TEST(MetaServiceJobVersionedReadTest, SchemaChangeJobTest) {
+    auto meta_service = get_meta_service(false);
+    std::string instance_id = "test_cloud_instance_id";
+    std::string cloud_unique_id = "1:test_cloud_unique_id:1";
+    MOCK_GET_INSTANCE_ID(instance_id);
+    create_and_refresh_instance(meta_service.get(), instance_id);
+
+    int64_t table_id = 1, index_id = 2, partition_id = 3, tablet_id = 4;
+    int64_t new_tablet_id = 14;
+    {
+        // Create tablets
+        create_tablet(meta_service.get(), table_id, index_id, partition_id, 
tablet_id, false);
+        create_tablet(meta_service.get(), table_id, index_id, partition_id, 
new_tablet_id, false,
+                      true);
+    }
+
+    {
+        // Create rowsets for old tablet
+        insert_rowset(meta_service.get(), 1, "commit_rowset_1", table_id, 
partition_id, tablet_id);
+        insert_rowset(meta_service.get(), 1, "commit_rowset_2", table_id, 
partition_id, tablet_id);
+        insert_rowset(meta_service.get(), 1, "commit_rowset_3", table_id, 
partition_id, tablet_id);
+        insert_rowset(meta_service.get(), 1, "commit_rowset_4", table_id, 
partition_id, tablet_id);
+    }
+
+    auto get_tablet_stats = [&](int64_t tid) -> TabletStatsPB {
+        GetTabletStatsRequest get_tablet_stats_req;
+        get_tablet_stats_req.set_cloud_unique_id(cloud_unique_id);
+        auto* tablet_idx = get_tablet_stats_req.add_tablet_idx();
+        tablet_idx->set_tablet_id(tid);
+        tablet_idx->set_db_id(1);
+        tablet_idx->set_index_id(index_id);
+        tablet_idx->set_partition_id(partition_id);
+        tablet_idx->set_table_id(table_id);
+        GetTabletStatsResponse get_tablet_stats_resp;
+        brpc::Controller cntl;
+        meta_service->get_tablet_stats(&cntl, &get_tablet_stats_req, 
&get_tablet_stats_resp,
+                                       nullptr);
+        EXPECT_EQ(get_tablet_stats_resp.status().code(), MetaServiceCode::OK);
+        EXPECT_EQ(get_tablet_stats_resp.tablet_stats_size(), 1);
+        return get_tablet_stats_resp.tablet_stats(0);
+    };
+
+    std::string job_id = "schema_change_job_1";
+    int64_t txn_id = 123456;
+
+    {
+        // Start schema change job
+        StartTabletJobRequest req;
+        StartTabletJobResponse res;
+
+        req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
+        req.mutable_job()->mutable_idx()->set_table_id(table_id);
+        req.mutable_job()->mutable_idx()->set_index_id(index_id);
+        req.mutable_job()->mutable_idx()->set_partition_id(partition_id);
+
+        auto* schema_change = req.mutable_job()->mutable_schema_change();
+        schema_change->set_id(job_id);
+        schema_change->set_initiator("ip:port");
+        schema_change->mutable_new_tablet_idx()->set_tablet_id(new_tablet_id);
+        schema_change->mutable_new_tablet_idx()->set_table_id(table_id);
+        schema_change->mutable_new_tablet_idx()->set_index_id(index_id);
+        
schema_change->mutable_new_tablet_idx()->set_partition_id(partition_id);
+        schema_change->set_expiration(time(nullptr) + 12);
+
+        brpc::Controller cntl;
+        meta_service->start_tablet_job(&cntl, &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+
+    // Create output rowsets for new tablet
+    std::vector<doris::RowsetMetaCloudPB> output_rowsets;
+    for (int64_t i = 0; i < 3; ++i) {
+        auto rowset = create_rowset(new_tablet_id, i + 2, i + 2, 100);
+        rowset.set_txn_id(txn_id + i);
+        output_rowsets.push_back(rowset);
+        CreateRowsetResponse res;
+        commit_rowset(meta_service.get(), output_rowsets.back(), res, txn_id + 
i);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+
+    auto old_tablet_stats_pb = get_tablet_stats(tablet_id);
+    auto new_tablet_stats_pb = get_tablet_stats(new_tablet_id);
+
+    {
+        // Finish schema change job
+        FinishTabletJobRequest req;
+        FinishTabletJobResponse res;
+
+        req.set_action(FinishTabletJobRequest::COMMIT);
+        req.mutable_job()->mutable_idx()->set_table_id(table_id);
+        req.mutable_job()->mutable_idx()->set_index_id(index_id);
+        req.mutable_job()->mutable_idx()->set_partition_id(partition_id);
+        req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
+
+        auto* schema_change = req.mutable_job()->mutable_schema_change();
+        schema_change->set_id(job_id);
+        schema_change->set_initiator("ip:port");
+        schema_change->mutable_new_tablet_idx()->set_tablet_id(new_tablet_id);
+        schema_change->mutable_new_tablet_idx()->set_table_id(table_id);
+        schema_change->mutable_new_tablet_idx()->set_index_id(index_id);
+        
schema_change->mutable_new_tablet_idx()->set_partition_id(partition_id);
+        schema_change->set_alter_version(output_rowsets.back().end_version());
+
+        // Set output rowsets info
+        for (const auto& rowset : output_rowsets) {
+            schema_change->add_txn_ids(rowset.txn_id());
+            schema_change->add_output_versions(rowset.end_version());
+        }
+        schema_change->set_num_output_rows(300);
+        schema_change->set_num_output_rowsets(3);
+        schema_change->set_num_output_segments(3);
+        schema_change->set_size_output_rowsets(300 * 110);
+        schema_change->set_index_size_output_rowsets(300 * 10);
+        schema_change->set_segment_size_output_rowsets(300 * 110);
+
+        brpc::Controller cntl;
+        meta_service->finish_tablet_job(&cntl, &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+        // Verify tablet stats are updated correctly
+        auto new_stats = get_tablet_stats(new_tablet_id);
+
+        EXPECT_EQ(new_stats.num_rows(),
+                  new_tablet_stats_pb.num_rows() + 
req.job().schema_change().num_output_rows());
+        EXPECT_EQ(new_stats.data_size(), new_tablet_stats_pb.data_size() +
+                                                 
req.job().schema_change().size_output_rowsets());
+        EXPECT_EQ(new_stats.num_rowsets(), new_tablet_stats_pb.num_rowsets() +
+                                                   
req.job().schema_change().num_output_rowsets());
+        EXPECT_EQ(new_stats.num_segments(),
+                  new_tablet_stats_pb.num_segments() +
+                          req.job().schema_change().num_output_segments());
+        EXPECT_EQ(new_stats.index_size(),
+                  new_tablet_stats_pb.index_size() +
+                          
req.job().schema_change().index_size_output_rowsets());
+        EXPECT_EQ(new_stats.segment_size(),
+                  new_tablet_stats_pb.segment_size() +
+                          
req.job().schema_change().segment_size_output_rowsets());
+    }
+}
+
 void check_delete_bitmap_lock(MetaServiceProxy* meta_service, std::string 
instance_id,
                               int64_t table_id, int64_t lock_id, bool exist) {
     std::unique_ptr<Transaction> txn;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to