This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b15d7ef33a6 [improve](cloud) Txn lazy committer commit partitions in 
parallel (#59291)
b15d7ef33a6 is described below

commit b15d7ef33a63125ad082f843f28cb73435339d97
Author: walter <[email protected]>
AuthorDate: Wed Dec 24 02:56:29 2025 +0800

    [improve](cloud) Txn lazy committer commit partitions in parallel (#59291)
---
 cloud/src/common/bvars.cpp                     |   1 +
 cloud/src/common/bvars.h                       |   1 +
 cloud/src/common/config.h                      |   2 +
 cloud/src/{recycler => common}/sync_executor.h |   0
 cloud/src/meta-service/meta_service_txn.cpp    |   8 +-
 cloud/src/meta-service/txn_lazy_committer.cpp  | 318 +++++++++++++++----------
 cloud/src/meta-service/txn_lazy_committer.h    |  10 +
 cloud/src/recycler/sync_executor.h             | 131 +---------
 8 files changed, 206 insertions(+), 265 deletions(-)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 92c2dc6bd22..0ebd399f23d 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -120,6 +120,7 @@ MBvarLatencyRecorderWithStatus<60> 
g_bvar_instance_txn_commit_with_tablet_count(
 bvar::LatencyRecorder g_bvar_ms_scan_instance_update("ms", 
"scan_instance_update");
 bvar::LatencyRecorder 
g_bvar_txn_lazy_committer_waiting_duration("txn_lazy_committer", "waiting");
 bvar::LatencyRecorder 
g_bvar_txn_lazy_committer_committing_duration("txn_lazy_committer", 
"committing");
+bvar::LatencyRecorder 
g_bvar_txn_lazy_committer_commit_partition_duration("txn_lazy_committer", 
"commit_partition");
 bvar::Adder<int64_t> g_bvar_txn_lazy_committer_submitted("txn_lazy_committer", 
"submitted");
 bvar::Adder<int64_t> g_bvar_txn_lazy_committer_finished("txn_lazy_committer", 
"finished");
 
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 254810b8f28..a5a957df137 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -629,6 +629,7 @@ extern MBvarLatencyRecorderWithStatus<60> 
g_bvar_instance_txn_commit_with_tablet
 extern bvar::LatencyRecorder g_bvar_ms_scan_instance_update;
 extern bvar::LatencyRecorder g_bvar_txn_lazy_committer_waiting_duration;
 extern bvar::LatencyRecorder g_bvar_txn_lazy_committer_committing_duration;
+extern bvar::LatencyRecorder 
g_bvar_txn_lazy_committer_commit_partition_duration;
 extern bvar::Adder<int64_t> g_bvar_txn_lazy_committer_submitted;
 extern bvar::Adder<int64_t> g_bvar_txn_lazy_committer_finished;
 
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index fae1db9b692..0a420d1d007 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -304,6 +304,8 @@ CONF_mInt64(max_txn_commit_byte, "7340032");
 CONF_Bool(enable_cloud_txn_lazy_commit, "true");
 CONF_Int32(txn_lazy_commit_rowsets_thresold, "1000");
 CONF_Int32(txn_lazy_commit_num_threads, "8");
+CONF_mBool(enable_cloud_parallel_txn_lazy_commit, "true");
+CONF_Int32(parallel_txn_lazy_commit_num_threads, "0"); // hardware concurrency 
if zero.
 CONF_mInt64(txn_lazy_max_rowsets_per_batch, "1000");
 CONF_mBool(txn_lazy_commit_shuffle_partitions, "true");
 CONF_Int64(txn_lazy_commit_shuffle_seed, "0"); // 0 means generate a random 
seed
diff --git a/cloud/src/recycler/sync_executor.h 
b/cloud/src/common/sync_executor.h
similarity index 100%
copy from cloud/src/recycler/sync_executor.h
copy to cloud/src/common/sync_executor.h
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index ba4b1220c2d..5f9b3847d5f 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -2370,10 +2370,10 @@ void MetaServiceImpl::commit_txn_eventually(
                       << " txn_id=" << txn_id;
         }
 
-        VLOG_DEBUG << "put_size=" << txn->put_bytes() << " del_size=" << 
txn->delete_bytes()
-                   << " num_put_keys=" << txn->num_put_keys()
-                   << " num_del_keys=" << txn->num_del_keys()
-                   << " txn_size=" << txn->approximate_bytes() << " txn_id=" 
<< txn_id;
+        LOG(INFO) << "put_size=" << txn->put_bytes() << " del_size=" << 
txn->delete_bytes()
+                  << " num_put_keys=" << txn->num_put_keys()
+                  << " num_del_keys=" << txn->num_del_keys()
+                  << " txn_size=" << txn->approximate_bytes() << " txn_id=" << 
txn_id;
 
         err = txn->commit();
         if (err != TxnErrorCode::TXN_OK) {
diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp 
b/cloud/src/meta-service/txn_lazy_committer.cpp
index bfae164f8c0..b7801c2706d 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -29,6 +29,7 @@
 #include "common/defer.h"
 #include "common/logging.h"
 #include "common/stats.h"
+#include "common/sync_executor.h"
 #include "common/util.h"
 #include "cpp/sync_point.h"
 #include "meta-service/meta_service_helper.h"
@@ -633,23 +634,12 @@ void TxnLazyCommitTask::commit() {
             // <partition_id, tmp_rowsets>
             std::map<int64_t, std::vector<std::pair<std::string, 
doris::RowsetMetaCloudPB>>>
                     partition_to_tmp_rowset_metas;
-            size_t max_rowset_meta_size = 0;
             for (auto& [tmp_rowset_key, tmp_rowset_pb] : all_tmp_rowset_metas) 
{
                 
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].emplace_back();
                 
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].back().first =
                         tmp_rowset_key;
                 
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].back().second =
                         tmp_rowset_pb;
-                max_rowset_meta_size = std::max(max_rowset_meta_size, 
tmp_rowset_pb.ByteSizeLong());
-            }
-
-            // fdb txn limit 10MB, we use 4MB as the max size for each batch.
-            size_t max_rowsets_per_batch = 
config::txn_lazy_max_rowsets_per_batch;
-            if (max_rowset_meta_size > 0) {
-                max_rowsets_per_batch = std::min((4UL << 20) / 
max_rowset_meta_size,
-                                                 
size_t(config::txn_lazy_max_rowsets_per_batch));
-                
TEST_SYNC_POINT_CALLBACK("TxnLazyCommitTask::commit::max_rowsets_per_batch",
-                                         &max_rowsets_per_batch, 
&max_rowset_meta_size);
             }
 
             // Shuffle partition ids to reduce the conflict probability
@@ -662,134 +652,40 @@ void TxnLazyCommitTask::commit() {
                 std::shuffle(partition_ids.begin(), partition_ids.end(), rng);
             }
 
-            for (int64_t partition_id : partition_ids) {
-                auto& tmp_rowset_metas = 
partition_to_tmp_rowset_metas[partition_id];
-
-                // tablet_id -> TabletIndexPB
-                std::map<int64_t, TabletIndexPB> tablet_ids;
-                int64_t table_id = -1;
-                {
-                    DCHECK(tmp_rowset_metas.size() > 0);
-                    int64_t first_tablet_id = 
tmp_rowset_metas.begin()->second.tablet_id();
-                    TabletIndexPB first_tablet_index;
-                    std::tie(code_, msg_) = get_tablet_index(
-                            meta_reader, txn_kv_.get(), instance_id_, txn_id_, 
first_tablet_id,
-                            &first_tablet_index, is_versioned_read);
-                    if (code_ != MetaServiceCode::OK) {
+            if (config::enable_cloud_parallel_txn_lazy_commit) {
+                SyncExecutor<std::pair<MetaServiceCode, std::string>> executor(
+                        txn_lazy_committer_->parallel_commit_pool(),
+                        fmt::format("txn_{}_parallel_commit", txn_id_));
+                for (int64_t partition_id : partition_ids) {
+                    executor.add([&, partition_id, this]() {
+                        return commit_partition(db_id, partition_id,
+                                                
partition_to_tmp_rowset_metas.at(partition_id),
+                                                is_versioned_read, 
is_versioned_write);
+                    });
+                }
+                bool finished = false;
+                auto task_results = executor.when_all(&finished);
+                for (auto&& [code, msg] : task_results) {
+                    if (code != MetaServiceCode::OK) {
+                        code_ = code;
+                        msg_ = std::move(msg);
                         break;
                     }
-                    LOG(INFO) << "first_tablet_index: " << 
first_tablet_index.ShortDebugString();
-                    table_id = first_tablet_index.table_id();
-                    tablet_ids.emplace(first_tablet_id, first_tablet_index);
                 }
-
-                // The partition version key is constructed during the txn 
commit process, so the versionstamp
-                // can be retrieved from the key itself.
-                Versionstamp versionstamp;
-                VersionPB original_partition_version;
-                std::tie(code_, msg_) = get_partition_version(
-                        txn_kv_.get(), instance_id_, txn_id_, db_id, table_id, 
partition_id,
-                        &original_partition_version, &versionstamp, 
is_versioned_read);
-                if (code_ != MetaServiceCode::OK) {
+                if (code_ == MetaServiceCode::OK && !finished) {
+                    LOG(WARNING) << "some partition commit tasks not finished, 
txn_id=" << txn_id_;
+                    code_ = MetaServiceCode::KV_TXN_CONFLICT;
                     break;
-                } else if (original_partition_version.pending_txn_ids_size() 
== 0 ||
-                           original_partition_version.pending_txn_ids(0) != 
txn_id_) {
-                    // The partition version does not contain the target 
pending txn, it might have been committed
-                    LOG(INFO) << "txn_id=" << txn_id_ << " partition_id=" << 
partition_id
-                              << " version has already been converted."
-                              << " version_pb:" << 
original_partition_version.ShortDebugString();
-                    
TEST_SYNC_POINT_CALLBACK("TxnLazyCommitTask::commit::already_been_converted",
-                                             &original_partition_version);
-                    continue;
                 }
-
-                for (size_t i = 0; i < tmp_rowset_metas.size(); i += 
max_rowsets_per_batch) {
-                    size_t end = (i + max_rowsets_per_batch) > 
tmp_rowset_metas.size()
-                                         ? tmp_rowset_metas.size()
-                                         : i + max_rowsets_per_batch;
-                    std::vector<std::pair<std::string, 
doris::RowsetMetaCloudPB>>
-                            
sub_partition_tmp_rowset_metas(tmp_rowset_metas.begin() + i,
-                                                           
tmp_rowset_metas.begin() + end);
-                    convert_tmp_rowsets(instance_id_, txn_id_, txn_kv_, code_, 
msg_, db_id,
-                                        sub_partition_tmp_rowset_metas, 
tablet_ids,
-                                        is_versioned_write, is_versioned_read, 
versionstamp,
-                                        
txn_lazy_committer_->resource_manager().get());
+            } else {
+                for (int64_t partition_id : partition_ids) {
+                    std::tie(code_, msg_) = commit_partition(
+                            db_id, partition_id, 
partition_to_tmp_rowset_metas[partition_id],
+                            is_versioned_read, is_versioned_write);
                     if (code_ != MetaServiceCode::OK) break;
                 }
-                if (code_ != MetaServiceCode::OK) break;
-
-                std::unique_ptr<Transaction> txn;
-                TxnErrorCode err = txn_kv_->create_txn(&txn);
-                if (err != TxnErrorCode::TXN_OK) {
-                    code_ = cast_as<ErrCategory::CREATE>(err);
-                    ss << "failed to create txn, txn_id=" << txn_id_ << " 
err=" << err;
-                    msg_ = ss.str();
-                    LOG(WARNING) << msg_;
-                    break;
-                }
-
-                DCHECK(table_id > 0);
-                DCHECK(partition_id > 0);
-
-                // Notice: get the versionstamp again, since it must not be 
changed here.
-                // But if you want to support multi pending txns in the 
partition version, this must be changed.
-                VersionPB version_pb;
-                std::tie(code_, msg_) = get_partition_version(
-                        txn.get(), instance_id_, txn_id_, db_id, table_id, 
partition_id,
-                        &version_pb, &versionstamp, is_versioned_read);
-                if (code_ != MetaServiceCode::OK) {
-                    break;
-                }
-
-                if (version_pb.pending_txn_ids_size() > 0 &&
-                    version_pb.pending_txn_ids(0) == txn_id_) {
-                    DCHECK(version_pb.pending_txn_ids_size() == 1);
-                    version_pb.clear_pending_txn_ids();
-
-                    if (version_pb.has_version()) {
-                        version_pb.set_version(version_pb.version() + 1);
-                    } else {
-                        // first commit txn version is 2
-                        version_pb.set_version(2);
-                    }
-                    std::string ver_key =
-                            partition_version_key({instance_id_, db_id, 
table_id, partition_id});
-                    std::string ver_val;
-                    if (!version_pb.SerializeToString(&ver_val)) {
-                        code_ = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
-                        ss << "failed to serialize version_pb when saving, 
txn_id=" << txn_id_;
-                        msg_ = ss.str();
-                        break;
-                    }
-                    txn->put(ver_key, ver_val);
-                    LOG(INFO) << "put ver_key=" << hex(ver_key) << " txn_id=" 
<< txn_id_
-                              << " version_pb=" << 
version_pb.ShortDebugString();
-                    if (is_versioned_write) {
-                        // Update the partition version with the specified 
versionstamp.
-                        std::string versioned_key =
-                                
versioned::partition_version_key({instance_id_, partition_id});
-                        versioned_put(txn.get(), versioned_key, versionstamp, 
ver_val);
-                        LOG(INFO) << "put versioned ver_key=" << 
hex(versioned_key)
-                                  << " txn_id=" << txn_id_
-                                  << " version_pb=" << 
version_pb.ShortDebugString();
-                    }
-
-                    for (auto& [tmp_rowset_key, tmp_rowset_pb] : 
tmp_rowset_metas) {
-                        txn->remove(tmp_rowset_key);
-                        LOG(INFO) << "remove tmp_rowset_key=" << 
hex(tmp_rowset_key)
-                                  << " txn_id=" << txn_id_;
-                    }
-
-                    TEST_SYNC_POINT_CALLBACK("TxnLazyCommitter::commit");
-                    err = txn->commit();
-                    if (err != TxnErrorCode::TXN_OK) {
-                        code_ = cast_as<ErrCategory::COMMIT>(err);
-                        ss << "failed to commit kv txn, txn_id=" << txn_id_ << 
" err=" << err;
-                        msg_ = ss.str();
-                        break;
-                    }
-                }
             }
+
             if (code_ != MetaServiceCode::OK) {
                 LOG(WARNING) << "txn_id=" << txn_id_ << " code=" << code_ << " 
msg=" << msg_;
                 break;
@@ -800,6 +696,158 @@ void TxnLazyCommitTask::commit() {
              retry_times++ < config::txn_store_retry_times);
 }
 
+std::pair<MetaServiceCode, std::string> TxnLazyCommitTask::commit_partition(
+        int64_t db_id, int64_t partition_id,
+        const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& 
tmp_rowset_metas,
+        bool is_versioned_read, bool is_versioned_write) {
+    std::stringstream ss;
+    CloneChainReader meta_reader(instance_id_, txn_kv_.get(),
+                                 
txn_lazy_committer_->resource_manager().get());
+    MetaServiceCode code;
+    std::string msg;
+
+    StopWatch sw;
+    DORIS_CLOUD_DEFER {
+        g_bvar_txn_lazy_committer_commit_partition_duration << sw.elapsed_us();
+    };
+
+    // tablet_id -> TabletIndexPB
+    std::map<int64_t, TabletIndexPB> tablet_ids;
+    int64_t table_id = -1;
+    {
+        DCHECK(tmp_rowset_metas.size() > 0);
+        int64_t first_tablet_id = tmp_rowset_metas.begin()->second.tablet_id();
+        TabletIndexPB first_tablet_index;
+        std::tie(code, msg) =
+                get_tablet_index(meta_reader, txn_kv_.get(), instance_id_, 
txn_id_, first_tablet_id,
+                                 &first_tablet_index, is_versioned_read);
+        if (code != MetaServiceCode::OK) {
+            return {code, msg};
+        }
+        table_id = first_tablet_index.table_id();
+        tablet_ids.emplace(first_tablet_id, first_tablet_index);
+    }
+
+    // The partition version key is constructed during the txn commit process, 
so the versionstamp
+    // can be retrieved from the key itself.
+    Versionstamp versionstamp;
+    VersionPB original_partition_version;
+    std::tie(code, msg) = get_partition_version(txn_kv_.get(), instance_id_, 
txn_id_, db_id,
+                                                table_id, partition_id, 
&original_partition_version,
+                                                &versionstamp, 
is_versioned_read);
+    if (code != MetaServiceCode::OK) {
+        return {code, msg};
+    } else if (original_partition_version.pending_txn_ids_size() == 0 ||
+               original_partition_version.pending_txn_ids(0) != txn_id_) {
+        // The partition version does not contain the target pending txn, it 
might have been committed
+        LOG(INFO) << "txn_id=" << txn_id_ << " partition_id=" << partition_id
+                  << " version has already been converted."
+                  << " version_pb:" << 
original_partition_version.ShortDebugString();
+        
TEST_SYNC_POINT_CALLBACK("TxnLazyCommitTask::commit::already_been_converted",
+                                 &original_partition_version);
+        return {MetaServiceCode::OK, ""};
+    }
+
+    size_t max_rowset_meta_size = 0;
+    for (const auto& [_, tmp_rowset_pb] : tmp_rowset_metas) {
+        max_rowset_meta_size = std::max(max_rowset_meta_size, 
tmp_rowset_pb.ByteSizeLong());
+    }
+
+    // fdb txn limit 10MB, we use 4MB as the max size for each batch.
+    size_t max_rowsets_per_batch = config::txn_lazy_max_rowsets_per_batch;
+    if (max_rowset_meta_size > 0) {
+        max_rowsets_per_batch = std::min((4UL << 20) / max_rowset_meta_size,
+                                         
size_t(config::txn_lazy_max_rowsets_per_batch));
+        
TEST_SYNC_POINT_CALLBACK("TxnLazyCommitTask::commit::max_rowsets_per_batch",
+                                 &max_rowsets_per_batch, 
&max_rowset_meta_size);
+    }
+
+    for (size_t i = 0; i < tmp_rowset_metas.size(); i += 
max_rowsets_per_batch) {
+        size_t end = (i + max_rowsets_per_batch) > tmp_rowset_metas.size()
+                             ? tmp_rowset_metas.size()
+                             : i + max_rowsets_per_batch;
+        std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>
+                sub_partition_tmp_rowset_metas(tmp_rowset_metas.begin() + i,
+                                               tmp_rowset_metas.begin() + end);
+        convert_tmp_rowsets(instance_id_, txn_id_, txn_kv_, code, msg, db_id,
+                            sub_partition_tmp_rowset_metas, tablet_ids, 
is_versioned_write,
+                            is_versioned_read, versionstamp,
+                            txn_lazy_committer_->resource_manager().get());
+        if (code != MetaServiceCode::OK) {
+            return {code, msg};
+        }
+    }
+
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "failed to create txn, txn_id=" << txn_id_ << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return {code, msg};
+    }
+
+    DCHECK(table_id > 0);
+    DCHECK(partition_id > 0);
+
+    // Notice: get the versionstamp again, since it must not be changed here.
+    // But if you want to support multi pending txns in the partition version, 
this must be changed.
+    VersionPB version_pb;
+    std::tie(code, msg) =
+            get_partition_version(txn.get(), instance_id_, txn_id_, db_id, 
table_id, partition_id,
+                                  &version_pb, &versionstamp, 
is_versioned_read);
+    if (code != MetaServiceCode::OK) {
+        return {code, msg};
+    }
+
+    if (version_pb.pending_txn_ids_size() > 0 && version_pb.pending_txn_ids(0) 
== txn_id_) {
+        DCHECK(version_pb.pending_txn_ids_size() == 1);
+        version_pb.clear_pending_txn_ids();
+
+        if (version_pb.has_version()) {
+            version_pb.set_version(version_pb.version() + 1);
+        } else {
+            // first commit txn version is 2
+            version_pb.set_version(2);
+        }
+        std::string ver_key = partition_version_key({instance_id_, db_id, 
table_id, partition_id});
+        std::string ver_val;
+        if (!version_pb.SerializeToString(&ver_val)) {
+            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+            ss << "failed to serialize version_pb when saving, txn_id=" << 
txn_id_;
+            msg = ss.str();
+            return {code, msg};
+        }
+        txn->put(ver_key, ver_val);
+        LOG(INFO) << "put ver_key=" << hex(ver_key) << " txn_id=" << txn_id_
+                  << " version_pb=" << version_pb.ShortDebugString();
+        if (is_versioned_write) {
+            // Update the partition version with the specified versionstamp.
+            std::string versioned_key =
+                    versioned::partition_version_key({instance_id_, 
partition_id});
+            versioned_put(txn.get(), versioned_key, versionstamp, ver_val);
+            LOG(INFO) << "put versioned ver_key=" << hex(versioned_key) << " 
txn_id=" << txn_id_
+                      << " version_pb=" << version_pb.ShortDebugString();
+        }
+
+        for (auto& [tmp_rowset_key, tmp_rowset_pb] : tmp_rowset_metas) {
+            txn->remove(tmp_rowset_key);
+            LOG(INFO) << "remove tmp_rowset_key=" << hex(tmp_rowset_key) << " 
txn_id=" << txn_id_;
+        }
+
+        TEST_SYNC_POINT_CALLBACK("TxnLazyCommitter::commit");
+        err = txn->commit();
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::COMMIT>(err);
+            ss << "failed to commit kv txn, txn_id=" << txn_id_ << " err=" << 
err;
+            msg = ss.str();
+            return {code, msg};
+        }
+    }
+    return {MetaServiceCode::OK, ""};
+}
+
 std::pair<MetaServiceCode, std::string> TxnLazyCommitTask::wait() {
     constexpr auto WAIT_FOR_MICROSECONDS = 5 * 1000000;
 
@@ -833,6 +881,14 @@ TxnLazyCommitter::TxnLazyCommitter(std::shared_ptr<TxnKv> 
txn_kv,
     worker_pool_ = 
std::make_unique<SimpleThreadPool>(config::txn_lazy_commit_num_threads,
                                                       "txn_lazy_commiter");
     worker_pool_->start();
+
+    int32_t parallel_commit_threads = std::max(0, 
config::parallel_txn_lazy_commit_num_threads);
+    if (parallel_commit_threads == 0) {
+        parallel_commit_threads = std::thread::hardware_concurrency();
+    }
+    parallel_commit_pool_ =
+            std::make_shared<SimpleThreadPool>(parallel_commit_threads, 
"parallel_commit");
+    parallel_commit_pool_->start();
 }
 
 /**
diff --git a/cloud/src/meta-service/txn_lazy_committer.h 
b/cloud/src/meta-service/txn_lazy_committer.h
index 602818801c2..8ab62aff89f 100644
--- a/cloud/src/meta-service/txn_lazy_committer.h
+++ b/cloud/src/meta-service/txn_lazy_committer.h
@@ -21,7 +21,10 @@
 #include <bthread/mutex.h>
 #include <gen_cpp/cloud.pb.h>
 
+#include <memory>
+
 #include "common/simple_thread_pool.h"
+#include "meta-store/clone_chain_reader.h"
 #include "meta-store/txn_kv.h"
 #include "resource-manager/resource_manager.h"
 
@@ -42,6 +45,11 @@ public:
 private:
     friend class TxnLazyCommitter;
 
+    std::pair<MetaServiceCode, std::string> commit_partition(
+            int64_t db_id, int64_t partition_id,
+            const std::vector<std::pair<std::string, 
doris::RowsetMetaCloudPB>>& tmp_rowset_metas,
+            bool is_versioned_write, bool is_versioned_read);
+
     std::string instance_id_;
     int64_t txn_id_;
     std::shared_ptr<TxnKv> txn_kv_;
@@ -61,12 +69,14 @@ public:
     void remove(int64_t txn_id);
 
     std::shared_ptr<ResourceManager>& resource_manager() { return 
resource_mgr_; }
+    std::shared_ptr<SimpleThreadPool>& parallel_commit_pool() { return 
parallel_commit_pool_; }
 
 private:
     std::shared_ptr<TxnKv> txn_kv_;
     std::shared_ptr<ResourceManager> resource_mgr_;
 
     std::unique_ptr<SimpleThreadPool> worker_pool_;
+    std::shared_ptr<SimpleThreadPool> parallel_commit_pool_;
 
     std::mutex mutex_;
     // <txn_id, TxnLazyCommitTask>
diff --git a/cloud/src/recycler/sync_executor.h 
b/cloud/src/recycler/sync_executor.h
index 95650e5316a..77e0f2dc9ab 100644
--- a/cloud/src/recycler/sync_executor.h
+++ b/cloud/src/recycler/sync_executor.h
@@ -17,133 +17,4 @@
 
 #pragma once
 
-#include <bthread/countdown_event.h>
-#include <cpp/sync_point.h>
-#include <fmt/core.h>
-#include <gen_cpp/cloud.pb.h>
-#include <glog/logging.h>
-
-#include <chrono>
-#include <future>
-#include <string>
-
-#include "common/defer.h"
-#include "common/simple_thread_pool.h"
-
-namespace doris::cloud {
-
-template <typename T>
-class SyncExecutor {
-public:
-    SyncExecutor(
-            std::shared_ptr<SimpleThreadPool> pool, std::string name_tag,
-            std::function<bool(const T&)> cancel = [](const T& /**/) { return 
false; })
-            : _pool(std::move(pool)), _cancel(std::move(cancel)), 
_name_tag(std::move(name_tag)) {}
-    auto add(std::function<T()> callback) -> SyncExecutor<T>& {
-        auto task = std::make_unique<Task>(std::move(callback), _cancel, 
_count);
-        _count.add_count();
-        // The actual task logic would be wrapped by one promise and passed to 
the threadpool.
-        // The result would be returned by the future once the task is 
finished.
-        // Or the task would be invalid if the whole task is cancelled.
-        int r = _pool->submit([this, t = task.get()]() { (*t)(_stop_token); });
-        CHECK(r == 0);
-        _res.emplace_back(std::move(task));
-        return *this;
-    }
-    std::vector<T> when_all(bool* finished) {
-        DORIS_CLOUD_DEFER {
-            _reset();
-        };
-        timespec current_time;
-        auto current_time_second = time(nullptr);
-        current_time.tv_sec = current_time_second + 300;
-        current_time.tv_nsec = 0;
-        // Wait for all tasks to complete
-        while (0 != _count.timed_wait(current_time)) {
-            current_time.tv_sec += 300;
-            LOG(WARNING) << _name_tag << " has already taken 5 min, cost: "
-                         << time(nullptr) - current_time_second << " seconds";
-        }
-        *finished = !_stop_token;
-        std::vector<T> res;
-        res.reserve(_res.size());
-        for (auto& task : _res) {
-            if (!task->valid()) {
-                *finished = false;
-                return res;
-            }
-            size_t max_wait_ms = 10000;
-            TEST_SYNC_POINT_CALLBACK("SyncExecutor::when_all.set_wait_time", 
&max_wait_ms);
-            // _count.timed_wait has already ensured that all tasks are 
completed.
-            // The 10 seconds here is just waiting for the task results to be 
returned,
-            // so 10 seconds is more than enough.
-            auto status = task->wait_for(max_wait_ms);
-            if (status == std::future_status::ready) {
-                res.emplace_back(task->get());
-            } else {
-                *finished = false;
-                LOG(WARNING) << _name_tag << " task timed out after 10 
seconds";
-                return res;
-            }
-        }
-        return res;
-    }
-
-private:
-    void _reset() {
-        _count.reset(0);
-        _res.clear();
-        _stop_token = false;
-    }
-
-private:
-    class Task {
-    public:
-        Task(std::function<T()> callback, std::function<bool(const T&)> cancel,
-             bthread::CountdownEvent& count)
-                : _callback(std::move(callback)),
-                  _cancel(std::move(cancel)),
-                  _count(count),
-                  _fut(_pro.get_future()) {}
-        void operator()(std::atomic_bool& stop_token) {
-            DORIS_CLOUD_DEFER {
-                _count.signal();
-            };
-            if (stop_token) {
-                _valid = false;
-                return;
-            }
-            T t = _callback();
-            // We'll return this task result to user even if this task return 
error
-            // So we don't set _valid to false here
-            if (_cancel(t)) {
-                stop_token = true;
-            }
-            _pro.set_value(std::move(t));
-        }
-        std::future_status wait_for(size_t milliseconds) {
-            return _fut.wait_for(std::chrono::milliseconds(milliseconds));
-        }
-        bool valid() { return _valid; }
-        T get() { return _fut.get(); }
-
-    private:
-        // It's guarantted that the valid function can only be called inside 
SyncExecutor's `when_all()` function
-        // and only be called when the _count.timed_wait function returned. So 
there would be no data race for
-        // _valid then it doesn't need to be one atomic bool.
-        bool _valid = true;
-        std::function<T()> _callback;
-        std::function<bool(const T&)> _cancel;
-        std::promise<T> _pro;
-        bthread::CountdownEvent& _count;
-        std::future<T> _fut;
-    };
-    std::vector<std::unique_ptr<Task>> _res;
-    // use CountdownEvent to do periodically log using 
CountdownEvent::time_wait()
-    bthread::CountdownEvent _count {0};
-    std::atomic_bool _stop_token {false};
-    std::shared_ptr<SimpleThreadPool> _pool;
-    std::function<bool(const T&)> _cancel;
-    std::string _name_tag;
-};
-} // namespace doris::cloud
\ No newline at end of file
+#include "common/sync_executor.h"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to