This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 30da34a9280 [fix](cloud) fix inconsistent reponse between cloud mode
and local mode for streamload 2PC (#38076)
30da34a9280 is described below
commit 30da34a92800a8f72e1ee13dc5805d0aefc7cd23
Author: Xin Liao <[email protected]>
AuthorDate: Fri Jul 19 09:04:35 2024 +0800
[fix](cloud) fix inconsistent reponse between cloud mode and local mode for
streamload 2PC (#38076)
fix the regression test case of flink_connector_response for cloud p0.
---
cloud/src/meta-service/meta_service_txn.cpp | 19 ++++++++++++++-----
.../cloud/transaction/CloudGlobalTransactionMgr.java | 7 ++++---
2 files changed, 18 insertions(+), 8 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index 4d48d7c9df4..0a3439e94f7 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -826,7 +826,12 @@ void commit_txn_immediately(
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TXN_ID_NOT_FOUND
:
cast_as<ErrCategory::READ>(err);
- ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" <<
txn_id << " err=" << err;
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ ss << "transaction [" << txn_id << "] not found, db_id=" << db_id;
+ } else {
+ ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" <<
txn_id
+ << " err=" << err;
+ }
msg = ss.str();
LOG(WARNING) << msg;
return;
@@ -845,7 +850,7 @@ void commit_txn_immediately(
DCHECK(txn_info.txn_id() == txn_id);
if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
code = MetaServiceCode::TXN_ALREADY_ABORTED;
- ss << "transaction is already aborted: db_id=" << db_id << " txn_id="
<< txn_id;
+ ss << "transaction [" << txn_id << "] is already aborted, db_id=" <<
db_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
@@ -1868,7 +1873,11 @@ void
MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller,
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TXN_ID_NOT_FOUND
:
cast_as<ErrCategory::READ>(err);
- ss << "failed to get db id, txn_id=" << txn_id << " err=" <<
err;
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ ss << "transaction [" << txn_id << "] not found";
+ } else {
+ ss << "failed to get txn info, txn_id=" << txn_id << "
err=" << err;
+ }
msg = ss.str();
return;
}
@@ -1911,13 +1920,13 @@ void
MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller,
//check state is valid.
if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
code = MetaServiceCode::TXN_ALREADY_ABORTED;
- ss << "transaction is already abort db_id=" << db_id << "txn_id="
<< txn_id;
+ ss << "transaction [" << txn_id << "] is already aborted, db_id="
<< db_id;
msg = ss.str();
return;
}
if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
code = MetaServiceCode::TXN_ALREADY_VISIBLE;
- ss << "transaction is already visible db_id=" << db_id <<
"txn_id=" << txn_id;
+ ss << "transaction [" << txn_id << "] is already VISIBLE, db_id="
<< db_id;
msg = ss.str();
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 727192b4e57..a4d0e582471 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -529,6 +529,10 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
throw new UserException("commitTxn() failed, errMsg:" +
e.getMessage());
}
+ if (is2PC && (commitTxnResponse.getStatus().getCode() ==
MetaServiceCode.TXN_ALREADY_VISIBLE
+ || commitTxnResponse.getStatus().getCode() ==
MetaServiceCode.TXN_ALREADY_ABORTED)) {
+ throw new UserException(commitTxnResponse.getStatus().getMsg());
+ }
if (commitTxnResponse.getStatus().getCode() != MetaServiceCode.OK
&& commitTxnResponse.getStatus().getCode() !=
MetaServiceCode.TXN_ALREADY_VISIBLE) {
LOG.warn("commitTxn failed, transactionId:{}, retryTime:{},
commitTxnResponse:{}",
@@ -545,9 +549,6 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
internalMsgBuilder.append(commitTxnResponse.getStatus().getCode());
throw new UserException("internal error, " +
internalMsgBuilder.toString());
}
- if (is2PC && commitTxnResponse.getStatus().getCode() ==
MetaServiceCode.TXN_ALREADY_VISIBLE) {
- throw new UserException(commitTxnResponse.getStatus().getMsg());
- }
TransactionState txnState =
TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo());
TxnStateChangeCallback cb =
callbackFactory.getCallback(txnState.getCallbackId());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]