This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 201602d3d1f [streamload](2pc) Fix 2pc stream load txn in cloud mode
(#37033)
201602d3d1f is described below
commit 201602d3d1f50eedb8934d808034cb904c379cfb
Author: Gavin Chou <[email protected]>
AuthorDate: Sun Jun 30 20:37:05 2024 +0800
[streamload](2pc) Fix 2pc stream load txn in cloud mode (#37033)
Abort load txn with label only should be forwarded to FE master to
handle due to lack of db id.
---
be/src/cloud/cloud_meta_mgr.cpp | 6 +-
be/src/cloud/cloud_stream_load_executor.cpp | 69 ++++++++++++++++++++--
be/src/common/config.cpp | 2 +-
.../runtime/stream_load/stream_load_executor.cpp | 42 ++++++-------
.../apache/doris/service/FrontendServiceImpl.java | 2 +-
.../load_p0/stream_load/test_stream_load.groovy | 1 +
6 files changed, 89 insertions(+), 33 deletions(-)
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index f0a377cba67..732f3023e91 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -839,8 +839,12 @@ Status CloudMetaMgr::abort_txn(const StreamLoadContext&
ctx) {
if (ctx.db_id > 0 && !ctx.label.empty()) {
req.set_db_id(ctx.db_id);
req.set_label(ctx.label);
- } else {
+ } else if (ctx.txn_id > 0) {
req.set_txn_id(ctx.txn_id);
+ } else {
+ LOG(WARNING) << "failed abort txn, with illegal input, db_id=" <<
ctx.db_id
+ << " txn_id=" << ctx.txn_id << " label=" << ctx.label;
+ return Status::InternalError<false>("failed to abort txn");
}
return retry_rpc("abort txn", req, &res, &MetaService_Stub::abort_txn);
}
diff --git a/be/src/cloud/cloud_stream_load_executor.cpp
b/be/src/cloud/cloud_stream_load_executor.cpp
index b7d428e59a4..92fb73eacc1 100644
--- a/be/src/cloud/cloud_stream_load_executor.cpp
+++ b/be/src/cloud/cloud_stream_load_executor.cpp
@@ -26,6 +26,12 @@
namespace doris {
+enum class TxnOpParamType : int {
+ ILLEGAL,
+ WITH_TXN_ID,
+ WITH_LABEL,
+};
+
CloudStreamLoadExecutor::CloudStreamLoadExecutor(ExecEnv* exec_env)
: StreamLoadExecutor(exec_env) {}
@@ -42,13 +48,48 @@ Status
CloudStreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) {
}
Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
- VLOG_DEBUG << "operate_txn_2pc, op: " << ctx->txn_operation;
+ std::stringstream ss;
+ ss << "db_id=" << ctx->db_id << " txn_id=" << ctx->txn_id << " label=" <<
ctx->label
+ << " txn_2pc_op=" << ctx->txn_operation;
+ std::string op_info = ss.str();
+ VLOG_DEBUG << "operate_txn_2pc " << op_info;
+ TxnOpParamType topt = ctx->txn_id > 0 ? TxnOpParamType::WITH_TXN_ID
+ : !ctx->label.empty() ? TxnOpParamType::WITH_LABEL
+ : TxnOpParamType::ILLEGAL;
+
+ Status st = Status::InternalError<false>("impossible branch reached, " +
op_info);
+
if (ctx->txn_operation.compare("commit") == 0) {
- return
_exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true);
+ if (topt == TxnOpParamType::WITH_TXN_ID) {
+ VLOG_DEBUG << "2pc commit stream load txn directly: " << op_info;
+ st =
_exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true);
+ } else if (topt == TxnOpParamType::WITH_LABEL) {
+ VLOG_DEBUG << "2pc commit stream load txn with FE support: " <<
op_info;
+ st = StreamLoadExecutor::operate_txn_2pc(ctx);
+ } else {
+ st = Status::InternalError<false>(
+ "failed to 2pc commit txn, with TxnOpParamType::illegal
input, " + op_info);
+ }
+ } else if (ctx->txn_operation.compare("abort") == 0) {
+ if (topt == TxnOpParamType::WITH_TXN_ID) {
+ LOG(INFO) << "2pc abort stream load txn directly: " << op_info;
+ st =
_exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx);
+ WARN_IF_ERROR(st, "failed to rollback txn " + op_info);
+ } else if (topt == TxnOpParamType::WITH_LABEL) { // maybe a label send
to FE to abort
+ VLOG_DEBUG << "2pc abort stream load txn with FE support: " <<
op_info;
+ StreamLoadExecutor::rollback_txn(ctx);
+ st = Status::OK();
+ } else {
+ st = Status::InternalError<false>("failed abort txn, with illegal
input, " + op_info);
+ }
} else {
- // 2pc abort
- return
_exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx);
+ std::string msg =
+ "failed to operate_txn_2pc, unrecognized operation: " +
ctx->txn_operation;
+ LOG(WARNING) << msg << " " << op_info;
+ st = Status::InternalError<false>(msg + " " + op_info);
}
+ WARN_IF_ERROR(st, "failed to operate_txn_2pc " + op_info)
+ return st;
}
Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
@@ -85,8 +126,24 @@ Status
CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
}
void CloudStreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
-
WARN_IF_ERROR(_exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx),
- "Failed to rollback txn");
+ std::stringstream ss;
+ ss << "db_id=" << ctx->db_id << " txn_id=" << ctx->txn_id << " label=" <<
ctx->label;
+ std::string op_info = ss.str();
+ LOG(INFO) << "rollback stream laod txn " << op_info;
+ TxnOpParamType topt = ctx->txn_id > 0 ? TxnOpParamType::WITH_TXN_ID
+ : !ctx->label.empty() ? TxnOpParamType::WITH_LABEL
+ : TxnOpParamType::ILLEGAL;
+
+ if (topt == TxnOpParamType::WITH_TXN_ID) {
+ VLOG_DEBUG << "abort stream load txn directly: " << op_info;
+
WARN_IF_ERROR(_exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx),
+ "failed to rollback txn " + op_info);
+ } else { // maybe a label send to FE to abort
+ // does not care about the return status
+ // ctx->db_id > 0 && !ctx->label.empty()
+ VLOG_DEBUG << "abort stream load txn with FE support: " << op_info;
+ StreamLoadExecutor::rollback_txn(ctx);
+ }
}
} // namespace doris
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 9df75b97bd6..c2274fd169b 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -139,7 +139,7 @@ DEFINE_mBool(enable_stacktrace_in_allocator_check_failed,
"false");
DEFINE_mInt64(large_memory_check_bytes, "2147483648");
-DEFINE_mBool(enable_memory_orphan_check, "true");
+DEFINE_mBool(enable_memory_orphan_check, "false");
// The maximum time a thread waits for full GC. Currently only query will wait
for full gc.
DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000");
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp
b/be/src/runtime/stream_load/stream_load_executor.cpp
index d26beb66827..2bd1c16199d 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -166,9 +166,9 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext*
ctx) {
TLoadTxnBeginRequest request;
set_request_auth(&request, ctx->auth);
- request.db = ctx->db;
- request.tbl = ctx->table;
- request.label = ctx->label;
+ request.__set_db(ctx->db);
+ request.__set_tbl(ctx->table);
+ request.__set_label(ctx->label);
// set timestamp
request.__set_timestamp(GetCurrentTimeMicros());
if (ctx->timeout_second != -1) {
@@ -286,27 +286,23 @@ Status
StreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
void StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx,
TLoadTxnCommitRequest& request) {
set_request_auth(&request, ctx->auth);
- request.db = ctx->db;
+ request.__set_db(ctx->db);
if (ctx->db_id > 0) {
- request.db_id = ctx->db_id;
- request.__isset.db_id = true;
+ request.__set_db_id(ctx->db_id);
}
- request.tbl = ctx->table;
- request.txnId = ctx->txn_id;
- request.sync = true;
- request.commitInfos = ctx->commit_infos;
- request.__isset.commitInfos = true;
+ request.__set_tbl(ctx->table);
+ request.__set_txnId(ctx->txn_id);
+ request.__set_sync(true);
+ request.__set_commitInfos(ctx->commit_infos);
request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms);
- request.tbls = ctx->table_list;
- request.__isset.tbls = true;
+ request.__set_tbls(ctx->table_list);
VLOG_DEBUG << "commit txn request:" <<
apache::thrift::ThriftDebugString(request);
// set attachment if has
TTxnCommitAttachment attachment;
if (collect_load_stat(ctx, &attachment)) {
- request.txnCommitAttachment = attachment;
- request.__isset.txnCommitAttachment = true;
+ request.__set_txnCommitAttachment(attachment);
}
}
@@ -353,22 +349,20 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext*
ctx) {
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
TLoadTxnRollbackRequest request;
set_request_auth(&request, ctx->auth);
- request.db = ctx->db;
+ request.__set_db(ctx->db);
if (ctx->db_id > 0) {
- request.db_id = ctx->db_id;
- request.__isset.db_id = true;
+ request.__set_db_id(ctx->db_id);
}
- request.tbl = ctx->table;
- request.txnId = ctx->txn_id;
+ request.__set_tbl(ctx->table);
+ request.__set_txnId(ctx->txn_id);
request.__set_reason(ctx->status.to_string());
- request.tbls = ctx->table_list;
- request.__isset.tbls = true;
+ request.__set_tbls(ctx->table_list);
+ request.__set_label(ctx->label);
// set attachment if has
TTxnCommitAttachment attachment;
if (collect_load_stat(ctx, &attachment)) {
- request.txnCommitAttachment = attachment;
- request.__isset.txnCommitAttachment = true;
+ request.__set_txnCommitAttachment(attachment);
}
TLoadTxnRollbackResult result;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 75a1987eb23..b7e11be47be 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1823,7 +1823,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
throw new MetaNotFoundException("db " + request.getDb() + " does
not exist");
}
long dbId = db.getId();
- if (request.getTxnId() != 0) { // txnId is required in thrift
+ if (request.getTxnId() > 0) { // txnId is required in thrift
TransactionState transactionState =
Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, request.getTxnId());
if (transactionState == null) {
diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
index 07decba0950..660ad50bd9d 100644
--- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
@@ -1166,6 +1166,7 @@ suite("test_stream_load", "p0") {
requestBuilder.setHeader("Expect", "100-Continue")
requestBuilder.setHeader("label", "${label}")
requestBuilder.setHeader("txn_operation", "${txn_operation}")
+ log.info("stream load request " + requestBuilder.toString())
String backendStreamLoadUri = null
client.execute(requestBuilder.build()).withCloseable { resp ->
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]