This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 29b99add5fa [fix](cloud) provide a conf to enable/disable streamload
commit on be (#37855)
29b99add5fa is described below
commit 29b99add5fae9efe6fd9f6c49ddbae0339729dcf
Author: Yongqiang YANG <[email protected]>
AuthorDate: Wed Jul 17 17:32:25 2024 +0800
[fix](cloud) provide a conf to enable/disable streamload commit on be
(#37855)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---------
Signed-off-by: freemandealer <[email protected]>
Co-authored-by: freemandealer <[email protected]>
---
be/src/cloud/cloud_stream_load_executor.cpp | 12 ++++++------
be/src/common/config.cpp | 2 ++
be/src/common/config.h | 2 ++
3 files changed, 10 insertions(+), 6 deletions(-)
diff --git a/be/src/cloud/cloud_stream_load_executor.cpp
b/be/src/cloud/cloud_stream_load_executor.cpp
index a87f37a5188..1b8167c96eb 100644
--- a/be/src/cloud/cloud_stream_load_executor.cpp
+++ b/be/src/cloud/cloud_stream_load_executor.cpp
@@ -60,7 +60,10 @@ Status
CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
Status st = Status::InternalError<false>("impossible branch reached, " +
op_info);
if (ctx->txn_operation.compare("commit") == 0) {
- if (topt == TxnOpParamType::WITH_TXN_ID) {
+ if (!config::enable_stream_load_commit_txn_on_be) {
+ VLOG_DEBUG << "2pc commit stream load txn with FE support: " <<
op_info;
+ st = StreamLoadExecutor::operate_txn_2pc(ctx);
+ } else if (topt == TxnOpParamType::WITH_TXN_ID) {
VLOG_DEBUG << "2pc commit stream load txn directly: " << op_info;
st =
_exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true);
} else if (topt == TxnOpParamType::WITH_LABEL) {
@@ -93,12 +96,9 @@ Status
CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
}
Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
- if (ctx->load_type == TLoadType::ROUTINE_LOAD) {
- return StreamLoadExecutor::commit_txn(ctx);
- }
-
// forward to fe to excute commit transaction for MoW table
- if (ctx->is_mow_table()) {
+ if (ctx->is_mow_table() || !config::enable_stream_load_commit_txn_on_be ||
+ ctx->load_type == TLoadType::ROUTINE_LOAD) {
Status st;
int retry_times = 0;
while (retry_times < config::mow_stream_load_commit_retry_times) {
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index c836bd3fb33..06156fd8598 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -541,6 +541,8 @@ DEFINE_mInt32(stream_load_record_batch_size, "50");
DEFINE_Int32(stream_load_record_expire_time_secs, "28800");
// time interval to clean expired stream load records
DEFINE_mInt64(clean_stream_load_record_interval_secs, "1800");
+// enable stream load commit txn on BE directly, bypassing FE. Only for cloud.
+DEFINE_mBool(enable_stream_load_commit_txn_on_be, "false");
// The buffer size to store stream table function schema info
DEFINE_Int64(stream_tvf_buffer_size, "1048576"); // 1MB
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8860e40b7ef..f0d127df0fc 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -595,6 +595,8 @@ DECLARE_mInt32(stream_load_record_batch_size);
DECLARE_Int32(stream_load_record_expire_time_secs);
// time interval to clean expired stream load records
DECLARE_mInt64(clean_stream_load_record_interval_secs);
+// enable stream load commit txn on BE directly, bypassing FE. Only for cloud.
+DECLARE_mBool(enable_stream_load_commit_txn_on_be);
// The buffer size to store stream table function schema info
DECLARE_Int64(stream_tvf_buffer_size);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]