This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b15d7ef33a6 [improve](cloud) Txn lazy committer commit partitions in
parallel (#59291)
b15d7ef33a6 is described below
commit b15d7ef33a63125ad082f843f28cb73435339d97
Author: walter <[email protected]>
AuthorDate: Wed Dec 24 02:56:29 2025 +0800
[improve](cloud) Txn lazy committer commit partitions in parallel (#59291)
---
cloud/src/common/bvars.cpp | 1 +
cloud/src/common/bvars.h | 1 +
cloud/src/common/config.h | 2 +
cloud/src/{recycler => common}/sync_executor.h | 0
cloud/src/meta-service/meta_service_txn.cpp | 8 +-
cloud/src/meta-service/txn_lazy_committer.cpp | 318 +++++++++++++++----------
cloud/src/meta-service/txn_lazy_committer.h | 10 +
cloud/src/recycler/sync_executor.h | 131 +---------
8 files changed, 206 insertions(+), 265 deletions(-)
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 92c2dc6bd22..0ebd399f23d 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -120,6 +120,7 @@ MBvarLatencyRecorderWithStatus<60>
g_bvar_instance_txn_commit_with_tablet_count(
bvar::LatencyRecorder g_bvar_ms_scan_instance_update("ms",
"scan_instance_update");
bvar::LatencyRecorder
g_bvar_txn_lazy_committer_waiting_duration("txn_lazy_committer", "waiting");
bvar::LatencyRecorder
g_bvar_txn_lazy_committer_committing_duration("txn_lazy_committer",
"committing");
+bvar::LatencyRecorder
g_bvar_txn_lazy_committer_commit_partition_duration("txn_lazy_committer",
"commit_partition");
bvar::Adder<int64_t> g_bvar_txn_lazy_committer_submitted("txn_lazy_committer",
"submitted");
bvar::Adder<int64_t> g_bvar_txn_lazy_committer_finished("txn_lazy_committer",
"finished");
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 254810b8f28..a5a957df137 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -629,6 +629,7 @@ extern MBvarLatencyRecorderWithStatus<60>
g_bvar_instance_txn_commit_with_tablet
extern bvar::LatencyRecorder g_bvar_ms_scan_instance_update;
extern bvar::LatencyRecorder g_bvar_txn_lazy_committer_waiting_duration;
extern bvar::LatencyRecorder g_bvar_txn_lazy_committer_committing_duration;
+extern bvar::LatencyRecorder
g_bvar_txn_lazy_committer_commit_partition_duration;
extern bvar::Adder<int64_t> g_bvar_txn_lazy_committer_submitted;
extern bvar::Adder<int64_t> g_bvar_txn_lazy_committer_finished;
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index fae1db9b692..0a420d1d007 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -304,6 +304,8 @@ CONF_mInt64(max_txn_commit_byte, "7340032");
CONF_Bool(enable_cloud_txn_lazy_commit, "true");
CONF_Int32(txn_lazy_commit_rowsets_thresold, "1000");
CONF_Int32(txn_lazy_commit_num_threads, "8");
+CONF_mBool(enable_cloud_parallel_txn_lazy_commit, "true");
+CONF_Int32(parallel_txn_lazy_commit_num_threads, "0"); // hardware concurrency
if zero.
CONF_mInt64(txn_lazy_max_rowsets_per_batch, "1000");
CONF_mBool(txn_lazy_commit_shuffle_partitions, "true");
CONF_Int64(txn_lazy_commit_shuffle_seed, "0"); // 0 means generate a random
seed
diff --git a/cloud/src/recycler/sync_executor.h
b/cloud/src/common/sync_executor.h
similarity index 100%
copy from cloud/src/recycler/sync_executor.h
copy to cloud/src/common/sync_executor.h
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index ba4b1220c2d..5f9b3847d5f 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -2370,10 +2370,10 @@ void MetaServiceImpl::commit_txn_eventually(
<< " txn_id=" << txn_id;
}
- VLOG_DEBUG << "put_size=" << txn->put_bytes() << " del_size=" <<
txn->delete_bytes()
- << " num_put_keys=" << txn->num_put_keys()
- << " num_del_keys=" << txn->num_del_keys()
- << " txn_size=" << txn->approximate_bytes() << " txn_id="
<< txn_id;
+ LOG(INFO) << "put_size=" << txn->put_bytes() << " del_size=" <<
txn->delete_bytes()
+ << " num_put_keys=" << txn->num_put_keys()
+ << " num_del_keys=" << txn->num_del_keys()
+ << " txn_size=" << txn->approximate_bytes() << " txn_id=" <<
txn_id;
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp
b/cloud/src/meta-service/txn_lazy_committer.cpp
index bfae164f8c0..b7801c2706d 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -29,6 +29,7 @@
#include "common/defer.h"
#include "common/logging.h"
#include "common/stats.h"
+#include "common/sync_executor.h"
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/meta_service_helper.h"
@@ -633,23 +634,12 @@ void TxnLazyCommitTask::commit() {
// <partition_id, tmp_rowsets>
std::map<int64_t, std::vector<std::pair<std::string,
doris::RowsetMetaCloudPB>>>
partition_to_tmp_rowset_metas;
- size_t max_rowset_meta_size = 0;
for (auto& [tmp_rowset_key, tmp_rowset_pb] : all_tmp_rowset_metas)
{
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].emplace_back();
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].back().first =
tmp_rowset_key;
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].back().second =
tmp_rowset_pb;
- max_rowset_meta_size = std::max(max_rowset_meta_size,
tmp_rowset_pb.ByteSizeLong());
- }
-
- // fdb txn limit 10MB, we use 4MB as the max size for each batch.
- size_t max_rowsets_per_batch =
config::txn_lazy_max_rowsets_per_batch;
- if (max_rowset_meta_size > 0) {
- max_rowsets_per_batch = std::min((4UL << 20) /
max_rowset_meta_size,
-
size_t(config::txn_lazy_max_rowsets_per_batch));
-
TEST_SYNC_POINT_CALLBACK("TxnLazyCommitTask::commit::max_rowsets_per_batch",
- &max_rowsets_per_batch,
&max_rowset_meta_size);
}
// Shuffle partition ids to reduce the conflict probability
@@ -662,134 +652,40 @@ void TxnLazyCommitTask::commit() {
std::shuffle(partition_ids.begin(), partition_ids.end(), rng);
}
- for (int64_t partition_id : partition_ids) {
- auto& tmp_rowset_metas =
partition_to_tmp_rowset_metas[partition_id];
-
- // tablet_id -> TabletIndexPB
- std::map<int64_t, TabletIndexPB> tablet_ids;
- int64_t table_id = -1;
- {
- DCHECK(tmp_rowset_metas.size() > 0);
- int64_t first_tablet_id =
tmp_rowset_metas.begin()->second.tablet_id();
- TabletIndexPB first_tablet_index;
- std::tie(code_, msg_) = get_tablet_index(
- meta_reader, txn_kv_.get(), instance_id_, txn_id_,
first_tablet_id,
- &first_tablet_index, is_versioned_read);
- if (code_ != MetaServiceCode::OK) {
+ if (config::enable_cloud_parallel_txn_lazy_commit) {
+ SyncExecutor<std::pair<MetaServiceCode, std::string>> executor(
+ txn_lazy_committer_->parallel_commit_pool(),
+ fmt::format("txn_{}_parallel_commit", txn_id_));
+ for (int64_t partition_id : partition_ids) {
+ executor.add([&, partition_id, this]() {
+ return commit_partition(db_id, partition_id,
+
partition_to_tmp_rowset_metas.at(partition_id),
+ is_versioned_read,
is_versioned_write);
+ });
+ }
+ bool finished = false;
+ auto task_results = executor.when_all(&finished);
+ for (auto&& [code, msg] : task_results) {
+ if (code != MetaServiceCode::OK) {
+ code_ = code;
+ msg_ = std::move(msg);
break;
}
- LOG(INFO) << "first_tablet_index: " <<
first_tablet_index.ShortDebugString();
- table_id = first_tablet_index.table_id();
- tablet_ids.emplace(first_tablet_id, first_tablet_index);
}
-
- // The partition version key is constructed during the txn
commit process, so the versionstamp
- // can be retrieved from the key itself.
- Versionstamp versionstamp;
- VersionPB original_partition_version;
- std::tie(code_, msg_) = get_partition_version(
- txn_kv_.get(), instance_id_, txn_id_, db_id, table_id,
partition_id,
- &original_partition_version, &versionstamp,
is_versioned_read);
- if (code_ != MetaServiceCode::OK) {
+ if (code_ == MetaServiceCode::OK && !finished) {
+ LOG(WARNING) << "some partition commit tasks not finished,
txn_id=" << txn_id_;
+ code_ = MetaServiceCode::KV_TXN_CONFLICT;
break;
- } else if (original_partition_version.pending_txn_ids_size()
== 0 ||
- original_partition_version.pending_txn_ids(0) !=
txn_id_) {
- // The partition version does not contain the target
pending txn, it might have been committed
- LOG(INFO) << "txn_id=" << txn_id_ << " partition_id=" <<
partition_id
- << " version has already been converted."
- << " version_pb:" <<
original_partition_version.ShortDebugString();
-
TEST_SYNC_POINT_CALLBACK("TxnLazyCommitTask::commit::already_been_converted",
- &original_partition_version);
- continue;
}
-
- for (size_t i = 0; i < tmp_rowset_metas.size(); i +=
max_rowsets_per_batch) {
- size_t end = (i + max_rowsets_per_batch) >
tmp_rowset_metas.size()
- ? tmp_rowset_metas.size()
- : i + max_rowsets_per_batch;
- std::vector<std::pair<std::string,
doris::RowsetMetaCloudPB>>
-
sub_partition_tmp_rowset_metas(tmp_rowset_metas.begin() + i,
-
tmp_rowset_metas.begin() + end);
- convert_tmp_rowsets(instance_id_, txn_id_, txn_kv_, code_,
msg_, db_id,
- sub_partition_tmp_rowset_metas,
tablet_ids,
- is_versioned_write, is_versioned_read,
versionstamp,
-
txn_lazy_committer_->resource_manager().get());
+ } else {
+ for (int64_t partition_id : partition_ids) {
+ std::tie(code_, msg_) = commit_partition(
+ db_id, partition_id,
partition_to_tmp_rowset_metas[partition_id],
+ is_versioned_read, is_versioned_write);
if (code_ != MetaServiceCode::OK) break;
}
- if (code_ != MetaServiceCode::OK) break;
-
- std::unique_ptr<Transaction> txn;
- TxnErrorCode err = txn_kv_->create_txn(&txn);
- if (err != TxnErrorCode::TXN_OK) {
- code_ = cast_as<ErrCategory::CREATE>(err);
- ss << "failed to create txn, txn_id=" << txn_id_ << "
err=" << err;
- msg_ = ss.str();
- LOG(WARNING) << msg_;
- break;
- }
-
- DCHECK(table_id > 0);
- DCHECK(partition_id > 0);
-
- // Notice: get the versionstamp again, since it must not be
changed here.
- // But if you want to support multi pending txns in the
partition version, this must be changed.
- VersionPB version_pb;
- std::tie(code_, msg_) = get_partition_version(
- txn.get(), instance_id_, txn_id_, db_id, table_id,
partition_id,
- &version_pb, &versionstamp, is_versioned_read);
- if (code_ != MetaServiceCode::OK) {
- break;
- }
-
- if (version_pb.pending_txn_ids_size() > 0 &&
- version_pb.pending_txn_ids(0) == txn_id_) {
- DCHECK(version_pb.pending_txn_ids_size() == 1);
- version_pb.clear_pending_txn_ids();
-
- if (version_pb.has_version()) {
- version_pb.set_version(version_pb.version() + 1);
- } else {
- // first commit txn version is 2
- version_pb.set_version(2);
- }
- std::string ver_key =
- partition_version_key({instance_id_, db_id,
table_id, partition_id});
- std::string ver_val;
- if (!version_pb.SerializeToString(&ver_val)) {
- code_ = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
- ss << "failed to serialize version_pb when saving,
txn_id=" << txn_id_;
- msg_ = ss.str();
- break;
- }
- txn->put(ver_key, ver_val);
- LOG(INFO) << "put ver_key=" << hex(ver_key) << " txn_id="
<< txn_id_
- << " version_pb=" <<
version_pb.ShortDebugString();
- if (is_versioned_write) {
- // Update the partition version with the specified
versionstamp.
- std::string versioned_key =
-
versioned::partition_version_key({instance_id_, partition_id});
- versioned_put(txn.get(), versioned_key, versionstamp,
ver_val);
- LOG(INFO) << "put versioned ver_key=" <<
hex(versioned_key)
- << " txn_id=" << txn_id_
- << " version_pb=" <<
version_pb.ShortDebugString();
- }
-
- for (auto& [tmp_rowset_key, tmp_rowset_pb] :
tmp_rowset_metas) {
- txn->remove(tmp_rowset_key);
- LOG(INFO) << "remove tmp_rowset_key=" <<
hex(tmp_rowset_key)
- << " txn_id=" << txn_id_;
- }
-
- TEST_SYNC_POINT_CALLBACK("TxnLazyCommitter::commit");
- err = txn->commit();
- if (err != TxnErrorCode::TXN_OK) {
- code_ = cast_as<ErrCategory::COMMIT>(err);
- ss << "failed to commit kv txn, txn_id=" << txn_id_ <<
" err=" << err;
- msg_ = ss.str();
- break;
- }
- }
}
+
if (code_ != MetaServiceCode::OK) {
LOG(WARNING) << "txn_id=" << txn_id_ << " code=" << code_ << "
msg=" << msg_;
break;
@@ -800,6 +696,158 @@ void TxnLazyCommitTask::commit() {
retry_times++ < config::txn_store_retry_times);
}
+std::pair<MetaServiceCode, std::string> TxnLazyCommitTask::commit_partition(
+ int64_t db_id, int64_t partition_id,
+ const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>&
tmp_rowset_metas,
+ bool is_versioned_read, bool is_versioned_write) {
+ std::stringstream ss;
+ CloneChainReader meta_reader(instance_id_, txn_kv_.get(),
+
txn_lazy_committer_->resource_manager().get());
+ MetaServiceCode code;
+ std::string msg;
+
+ StopWatch sw;
+ DORIS_CLOUD_DEFER {
+ g_bvar_txn_lazy_committer_commit_partition_duration << sw.elapsed_us();
+ };
+
+ // tablet_id -> TabletIndexPB
+ std::map<int64_t, TabletIndexPB> tablet_ids;
+ int64_t table_id = -1;
+ {
+ DCHECK(tmp_rowset_metas.size() > 0);
+ int64_t first_tablet_id = tmp_rowset_metas.begin()->second.tablet_id();
+ TabletIndexPB first_tablet_index;
+ std::tie(code, msg) =
+ get_tablet_index(meta_reader, txn_kv_.get(), instance_id_,
txn_id_, first_tablet_id,
+ &first_tablet_index, is_versioned_read);
+ if (code != MetaServiceCode::OK) {
+ return {code, msg};
+ }
+ table_id = first_tablet_index.table_id();
+ tablet_ids.emplace(first_tablet_id, first_tablet_index);
+ }
+
+ // The partition version key is constructed during the txn commit process,
so the versionstamp
+ // can be retrieved from the key itself.
+ Versionstamp versionstamp;
+ VersionPB original_partition_version;
+ std::tie(code, msg) = get_partition_version(txn_kv_.get(), instance_id_,
txn_id_, db_id,
+ table_id, partition_id,
&original_partition_version,
+ &versionstamp,
is_versioned_read);
+ if (code != MetaServiceCode::OK) {
+ return {code, msg};
+ } else if (original_partition_version.pending_txn_ids_size() == 0 ||
+ original_partition_version.pending_txn_ids(0) != txn_id_) {
+ // The partition version does not contain the target pending txn, it
might have been committed
+ LOG(INFO) << "txn_id=" << txn_id_ << " partition_id=" << partition_id
+ << " version has already been converted."
+ << " version_pb:" <<
original_partition_version.ShortDebugString();
+
TEST_SYNC_POINT_CALLBACK("TxnLazyCommitTask::commit::already_been_converted",
+ &original_partition_version);
+ return {MetaServiceCode::OK, ""};
+ }
+
+ size_t max_rowset_meta_size = 0;
+ for (const auto& [_, tmp_rowset_pb] : tmp_rowset_metas) {
+ max_rowset_meta_size = std::max(max_rowset_meta_size,
tmp_rowset_pb.ByteSizeLong());
+ }
+
+ // fdb txn limit 10MB, we use 4MB as the max size for each batch.
+ size_t max_rowsets_per_batch = config::txn_lazy_max_rowsets_per_batch;
+ if (max_rowset_meta_size > 0) {
+ max_rowsets_per_batch = std::min((4UL << 20) / max_rowset_meta_size,
+
size_t(config::txn_lazy_max_rowsets_per_batch));
+
TEST_SYNC_POINT_CALLBACK("TxnLazyCommitTask::commit::max_rowsets_per_batch",
+ &max_rowsets_per_batch,
&max_rowset_meta_size);
+ }
+
+ for (size_t i = 0; i < tmp_rowset_metas.size(); i +=
max_rowsets_per_batch) {
+ size_t end = (i + max_rowsets_per_batch) > tmp_rowset_metas.size()
+ ? tmp_rowset_metas.size()
+ : i + max_rowsets_per_batch;
+ std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>
+ sub_partition_tmp_rowset_metas(tmp_rowset_metas.begin() + i,
+ tmp_rowset_metas.begin() + end);
+ convert_tmp_rowsets(instance_id_, txn_id_, txn_kv_, code, msg, db_id,
+ sub_partition_tmp_rowset_metas, tablet_ids,
is_versioned_write,
+ is_versioned_read, versionstamp,
+ txn_lazy_committer_->resource_manager().get());
+ if (code != MetaServiceCode::OK) {
+ return {code, msg};
+ }
+ }
+
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ ss << "failed to create txn, txn_id=" << txn_id_ << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return {code, msg};
+ }
+
+ DCHECK(table_id > 0);
+ DCHECK(partition_id > 0);
+
+ // Notice: get the versionstamp again, since it must not be changed here.
+ // But if you want to support multi pending txns in the partition version,
this must be changed.
+ VersionPB version_pb;
+ std::tie(code, msg) =
+ get_partition_version(txn.get(), instance_id_, txn_id_, db_id,
table_id, partition_id,
+ &version_pb, &versionstamp,
is_versioned_read);
+ if (code != MetaServiceCode::OK) {
+ return {code, msg};
+ }
+
+ if (version_pb.pending_txn_ids_size() > 0 && version_pb.pending_txn_ids(0)
== txn_id_) {
+ DCHECK(version_pb.pending_txn_ids_size() == 1);
+ version_pb.clear_pending_txn_ids();
+
+ if (version_pb.has_version()) {
+ version_pb.set_version(version_pb.version() + 1);
+ } else {
+ // first commit txn version is 2
+ version_pb.set_version(2);
+ }
+ std::string ver_key = partition_version_key({instance_id_, db_id,
table_id, partition_id});
+ std::string ver_val;
+ if (!version_pb.SerializeToString(&ver_val)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ ss << "failed to serialize version_pb when saving, txn_id=" <<
txn_id_;
+ msg = ss.str();
+ return {code, msg};
+ }
+ txn->put(ver_key, ver_val);
+ LOG(INFO) << "put ver_key=" << hex(ver_key) << " txn_id=" << txn_id_
+ << " version_pb=" << version_pb.ShortDebugString();
+ if (is_versioned_write) {
+ // Update the partition version with the specified versionstamp.
+ std::string versioned_key =
+ versioned::partition_version_key({instance_id_,
partition_id});
+ versioned_put(txn.get(), versioned_key, versionstamp, ver_val);
+ LOG(INFO) << "put versioned ver_key=" << hex(versioned_key) << "
txn_id=" << txn_id_
+ << " version_pb=" << version_pb.ShortDebugString();
+ }
+
+ for (auto& [tmp_rowset_key, tmp_rowset_pb] : tmp_rowset_metas) {
+ txn->remove(tmp_rowset_key);
+ LOG(INFO) << "remove tmp_rowset_key=" << hex(tmp_rowset_key) << "
txn_id=" << txn_id_;
+ }
+
+ TEST_SYNC_POINT_CALLBACK("TxnLazyCommitter::commit");
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ ss << "failed to commit kv txn, txn_id=" << txn_id_ << " err=" <<
err;
+ msg = ss.str();
+ return {code, msg};
+ }
+ }
+ return {MetaServiceCode::OK, ""};
+}
+
std::pair<MetaServiceCode, std::string> TxnLazyCommitTask::wait() {
constexpr auto WAIT_FOR_MICROSECONDS = 5 * 1000000;
@@ -833,6 +881,14 @@ TxnLazyCommitter::TxnLazyCommitter(std::shared_ptr<TxnKv>
txn_kv,
worker_pool_ =
std::make_unique<SimpleThreadPool>(config::txn_lazy_commit_num_threads,
"txn_lazy_commiter");
worker_pool_->start();
+
+ int32_t parallel_commit_threads = std::max(0,
config::parallel_txn_lazy_commit_num_threads);
+ if (parallel_commit_threads == 0) {
+ parallel_commit_threads = std::thread::hardware_concurrency();
+ }
+ parallel_commit_pool_ =
+ std::make_shared<SimpleThreadPool>(parallel_commit_threads,
"parallel_commit");
+ parallel_commit_pool_->start();
}
/**
diff --git a/cloud/src/meta-service/txn_lazy_committer.h
b/cloud/src/meta-service/txn_lazy_committer.h
index 602818801c2..8ab62aff89f 100644
--- a/cloud/src/meta-service/txn_lazy_committer.h
+++ b/cloud/src/meta-service/txn_lazy_committer.h
@@ -21,7 +21,10 @@
#include <bthread/mutex.h>
#include <gen_cpp/cloud.pb.h>
+#include <memory>
+
#include "common/simple_thread_pool.h"
+#include "meta-store/clone_chain_reader.h"
#include "meta-store/txn_kv.h"
#include "resource-manager/resource_manager.h"
@@ -42,6 +45,11 @@ public:
private:
friend class TxnLazyCommitter;
+ std::pair<MetaServiceCode, std::string> commit_partition(
+ int64_t db_id, int64_t partition_id,
+ const std::vector<std::pair<std::string,
doris::RowsetMetaCloudPB>>& tmp_rowset_metas,
+ bool is_versioned_write, bool is_versioned_read);
+
std::string instance_id_;
int64_t txn_id_;
std::shared_ptr<TxnKv> txn_kv_;
@@ -61,12 +69,14 @@ public:
void remove(int64_t txn_id);
std::shared_ptr<ResourceManager>& resource_manager() { return
resource_mgr_; }
+ std::shared_ptr<SimpleThreadPool>& parallel_commit_pool() { return
parallel_commit_pool_; }
private:
std::shared_ptr<TxnKv> txn_kv_;
std::shared_ptr<ResourceManager> resource_mgr_;
std::unique_ptr<SimpleThreadPool> worker_pool_;
+ std::shared_ptr<SimpleThreadPool> parallel_commit_pool_;
std::mutex mutex_;
// <txn_id, TxnLazyCommitTask>
diff --git a/cloud/src/recycler/sync_executor.h
b/cloud/src/recycler/sync_executor.h
index 95650e5316a..77e0f2dc9ab 100644
--- a/cloud/src/recycler/sync_executor.h
+++ b/cloud/src/recycler/sync_executor.h
@@ -17,133 +17,4 @@
#pragma once
-#include <bthread/countdown_event.h>
-#include <cpp/sync_point.h>
-#include <fmt/core.h>
-#include <gen_cpp/cloud.pb.h>
-#include <glog/logging.h>
-
-#include <chrono>
-#include <future>
-#include <string>
-
-#include "common/defer.h"
-#include "common/simple_thread_pool.h"
-
-namespace doris::cloud {
-
-template <typename T>
-class SyncExecutor {
-public:
- SyncExecutor(
- std::shared_ptr<SimpleThreadPool> pool, std::string name_tag,
- std::function<bool(const T&)> cancel = [](const T& /**/) { return
false; })
- : _pool(std::move(pool)), _cancel(std::move(cancel)),
_name_tag(std::move(name_tag)) {}
- auto add(std::function<T()> callback) -> SyncExecutor<T>& {
- auto task = std::make_unique<Task>(std::move(callback), _cancel,
_count);
- _count.add_count();
- // The actual task logic would be wrapped by one promise and passed to
the threadpool.
- // The result would be returned by the future once the task is
finished.
- // Or the task would be invalid if the whole task is cancelled.
- int r = _pool->submit([this, t = task.get()]() { (*t)(_stop_token); });
- CHECK(r == 0);
- _res.emplace_back(std::move(task));
- return *this;
- }
- std::vector<T> when_all(bool* finished) {
- DORIS_CLOUD_DEFER {
- _reset();
- };
- timespec current_time;
- auto current_time_second = time(nullptr);
- current_time.tv_sec = current_time_second + 300;
- current_time.tv_nsec = 0;
- // Wait for all tasks to complete
- while (0 != _count.timed_wait(current_time)) {
- current_time.tv_sec += 300;
- LOG(WARNING) << _name_tag << " has already taken 5 min, cost: "
- << time(nullptr) - current_time_second << " seconds";
- }
- *finished = !_stop_token;
- std::vector<T> res;
- res.reserve(_res.size());
- for (auto& task : _res) {
- if (!task->valid()) {
- *finished = false;
- return res;
- }
- size_t max_wait_ms = 10000;
- TEST_SYNC_POINT_CALLBACK("SyncExecutor::when_all.set_wait_time",
&max_wait_ms);
- // _count.timed_wait has already ensured that all tasks are
completed.
- // The 10 seconds here is just waiting for the task results to be
returned,
- // so 10 seconds is more than enough.
- auto status = task->wait_for(max_wait_ms);
- if (status == std::future_status::ready) {
- res.emplace_back(task->get());
- } else {
- *finished = false;
- LOG(WARNING) << _name_tag << " task timed out after 10
seconds";
- return res;
- }
- }
- return res;
- }
-
-private:
- void _reset() {
- _count.reset(0);
- _res.clear();
- _stop_token = false;
- }
-
-private:
- class Task {
- public:
- Task(std::function<T()> callback, std::function<bool(const T&)> cancel,
- bthread::CountdownEvent& count)
- : _callback(std::move(callback)),
- _cancel(std::move(cancel)),
- _count(count),
- _fut(_pro.get_future()) {}
- void operator()(std::atomic_bool& stop_token) {
- DORIS_CLOUD_DEFER {
- _count.signal();
- };
- if (stop_token) {
- _valid = false;
- return;
- }
- T t = _callback();
- // We'll return this task result to user even if this task return
error
- // So we don't set _valid to false here
- if (_cancel(t)) {
- stop_token = true;
- }
- _pro.set_value(std::move(t));
- }
- std::future_status wait_for(size_t milliseconds) {
- return _fut.wait_for(std::chrono::milliseconds(milliseconds));
- }
- bool valid() { return _valid; }
- T get() { return _fut.get(); }
-
- private:
- // It's guarantted that the valid function can only be called inside
SyncExecutor's `when_all()` function
- // and only be called when the _count.timed_wait function returned. So
there would be no data race for
- // _valid then it doesn't need to be one atomic bool.
- bool _valid = true;
- std::function<T()> _callback;
- std::function<bool(const T&)> _cancel;
- std::promise<T> _pro;
- bthread::CountdownEvent& _count;
- std::future<T> _fut;
- };
- std::vector<std::unique_ptr<Task>> _res;
- // use CountdownEvent to do periodically log using
CountdownEvent::time_wait()
- bthread::CountdownEvent _count {0};
- std::atomic_bool _stop_token {false};
- std::shared_ptr<SimpleThreadPool> _pool;
- std::function<bool(const T&)> _cancel;
- std::string _name_tag;
-};
-} // namespace doris::cloud
\ No newline at end of file
+#include "common/sync_executor.h"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]