This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new e3c1aff1320 branch-4.0: [fix](transaction) ensure executing
before/after abort when abort transaction in cloud mode #59425 (#59907)
e3c1aff1320 is described below
commit e3c1aff1320a17eff9d1ea5f4a6fa0e702224872
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jan 16 10:57:49 2026 +0800
branch-4.0: [fix](transaction) ensure executing before/after abort when
abort transaction in cloud mode #59425 (#59907)
Cherry-picked from #59425
Co-authored-by: hui lai <[email protected]>
---
cloud/src/common/bvars.cpp | 5 +
cloud/src/common/bvars.h | 3 +
cloud/src/meta-service/meta_service.h | 13 +++
cloud/src/meta-service/meta_service_txn.cpp | 85 ++++++++++++++++
cloud/test/meta_service_test.cpp | 109 +++++++++++++++++++++
.../apache/doris/cloud/rpc/MetaServiceClient.java | 13 +++
.../apache/doris/cloud/rpc/MetaServiceProxy.java | 5 +
.../transaction/CloudGlobalTransactionMgr.java | 81 +++++++++------
.../load/routineload/RoutineLoadTaskInfo.java | 15 ++-
.../java/org/apache/doris/system/HeartbeatMgr.java | 9 ++
gensrc/proto/cloud.proto | 14 +++
.../regression/util/RoutineLoadTestUtils.groovy | 4 +-
.../test_routine_load_be_restart.groovy | 98 ++++++++++++++++++
13 files changed, 417 insertions(+), 37 deletions(-)
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index e35e5e677f6..6c3ba1eee50 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -43,6 +43,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn("ms",
"begin_sub_txn");
BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn("ms", "abort_sub_txn");
BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict("ms",
"check_txn_conflict");
BvarLatencyRecorderWithTag g_bvar_ms_abort_txn_with_coordinator("ms",
"abort_txn_with_coordinator");
+BvarLatencyRecorderWithTag g_bvar_ms_get_prepare_txn_by_coordinator("ms",
"get_prepare_txn_by_coordinator");
BvarLatencyRecorderWithTag g_bvar_ms_clean_txn_label("ms", "clean_txn_label");
BvarLatencyRecorderWithTag g_bvar_ms_get_version("ms", "get_version");
BvarLatencyRecorderWithTag g_bvar_ms_batch_get_version("ms",
"batch_get_version");
@@ -435,6 +436,8 @@ mBvarInt64Adder
g_bvar_rpc_kv_abort_sub_txn_get_counter("rpc_kv_abort_sub_txn_ge
mBvarInt64Adder
g_bvar_rpc_kv_abort_sub_txn_put_counter("rpc_kv_abort_sub_txn_put_counter",{"instance_id"});
// abort_txn_with_coordinator
mBvarInt64Adder
g_bvar_rpc_kv_abort_txn_with_coordinator_get_counter("rpc_kv_abort_txn_with_coordinator_get_counter",{"instance_id"});
+// get_prepare_txn_by_coordinator
+mBvarInt64Adder
g_bvar_rpc_kv_get_prepare_txn_by_coordinator_get_counter("rpc_kv_get_prepare_txn_by_coordinator_get_counter",{"instance_id"});
// check_txn_conflict
mBvarInt64Adder
g_bvar_rpc_kv_check_txn_conflict_get_counter("rpc_kv_check_txn_conflict_get_counter",{"instance_id"});
// clean_txn_label
@@ -635,6 +638,8 @@ mBvarInt64Adder
g_bvar_rpc_kv_abort_sub_txn_get_bytes("rpc_kv_abort_sub_txn_get_
mBvarInt64Adder
g_bvar_rpc_kv_abort_sub_txn_put_bytes("rpc_kv_abort_sub_txn_put_bytes",{"instance_id"});
// abort_txn_with_coordinator
mBvarInt64Adder
g_bvar_rpc_kv_abort_txn_with_coordinator_get_bytes("rpc_kv_abort_txn_with_coordinator_get_bytes",{"instance_id"});
+// get_prepare_txn_by_coordinator
+mBvarInt64Adder
g_bvar_rpc_kv_get_prepare_txn_by_coordinator_get_bytes("rpc_kv_get_prepare_txn_by_coordinator_get_bytes",{"instance_id"});
// check_txn_conflict
mBvarInt64Adder
g_bvar_rpc_kv_check_txn_conflict_get_bytes("rpc_kv_check_txn_conflict_get_bytes",{"instance_id"});
// clean_txn_label
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 25e324b3233..798f8f82433 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -553,6 +553,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict;
extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn_with_coordinator;
+extern BvarLatencyRecorderWithTag g_bvar_ms_get_prepare_txn_by_coordinator;
extern BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn;
extern BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn;
extern BvarLatencyRecorderWithTag g_bvar_ms_clean_txn_label;
@@ -869,6 +870,7 @@ extern mBvarInt64Adder
g_bvar_rpc_kv_begin_sub_txn_del_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_abort_sub_txn_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_abort_sub_txn_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_with_coordinator_get_counter;
+extern mBvarInt64Adder
g_bvar_rpc_kv_get_prepare_txn_by_coordinator_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_check_txn_conflict_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_clean_txn_label_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_clean_txn_label_put_counter;
@@ -1007,6 +1009,7 @@ extern mBvarInt64Adder
g_bvar_rpc_kv_begin_sub_txn_del_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_abort_sub_txn_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_abort_sub_txn_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_with_coordinator_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_get_prepare_txn_by_coordinator_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_check_txn_conflict_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_clean_txn_label_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_clean_txn_label_put_bytes;
diff --git a/cloud/src/meta-service/meta_service.h
b/cloud/src/meta-service/meta_service.h
index c801d438faf..e6747f6326d 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -144,6 +144,11 @@ public:
AbortTxnWithCoordinatorResponse* response,
::google::protobuf::Closure* done)
override;
+ void get_prepare_txn_by_coordinator(::google::protobuf::RpcController*
controller,
+ const
GetPrepareTxnByCoordinatorRequest* request,
+ GetPrepareTxnByCoordinatorResponse*
response,
+ ::google::protobuf::Closure* done)
override;
+
void clean_txn_label(::google::protobuf::RpcController* controller,
const CleanTxnLabelRequest* request,
CleanTxnLabelResponse* response,
::google::protobuf::Closure* done) override;
@@ -592,6 +597,14 @@ public:
done);
}
+ void get_prepare_txn_by_coordinator(::google::protobuf::RpcController*
controller,
+ const
GetPrepareTxnByCoordinatorRequest* request,
+ GetPrepareTxnByCoordinatorResponse*
response,
+ ::google::protobuf::Closure* done)
override {
+ call_impl(&cloud::MetaService::get_prepare_txn_by_coordinator,
controller, request,
+ response, done);
+ }
+
void clean_txn_label(::google::protobuf::RpcController* controller,
const CleanTxnLabelRequest* request,
CleanTxnLabelResponse* response,
::google::protobuf::Closure* done) override {
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index ab6e2fedfe5..9b1cfc49836 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -3896,6 +3896,91 @@ void
MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcControll
}
}
+void MetaServiceImpl::get_prepare_txn_by_coordinator(
+ ::google::protobuf::RpcController* controller,
+ const GetPrepareTxnByCoordinatorRequest* request,
+ GetPrepareTxnByCoordinatorResponse* response,
::google::protobuf::Closure* done) {
+ RPC_PREPROCESS(get_prepare_txn_by_coordinator, get);
+ if (!request->has_id() || !request->has_ip()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "invalid coordinate id or coordinate ip.";
+ return;
+ }
+ // TODO: For auth
+ std::string cloud_unique_id = request->has_cloud_unique_id() ?
request->cloud_unique_id() : "";
+ instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ ss << "cannot find instance_id with cloud_unique_id="
+ << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+ msg = ss.str();
+ return;
+ }
+ RPC_RATE_LIMIT(get_prepare_txn_by_coordinator);
+ std::string begin_info_key = txn_info_key({instance_id, 0, 0});
+ std::string end_info_key = txn_info_key({instance_id, INT64_MAX,
INT64_MAX});
+ LOG(INFO) << "begin_info_key:" << hex(begin_info_key) << " end_info_key:"
<< hex(end_info_key);
+
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ msg = "failed to create txn";
+ code = cast_as<ErrCategory::CREATE>(err);
+ return;
+ }
+ std::unique_ptr<RangeGetIterator> it;
+ int32_t result_count = 0;
+ int64_t total_iteration_cnt = 0;
+ bool has_start_time_filter = request->has_start_time();
+
+ do {
+ err = txn->get(begin_info_key, end_info_key, &it, true);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get txn info. err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ while (it->has_next()) {
+ total_iteration_cnt++;
+ auto [k, v] = it->next();
+ VLOG_DEBUG << "check txn info txn_info_key=" << hex(k);
+ TxnInfoPB info_pb;
+ if (!info_pb.ParseFromArray(v.data(), v.size())) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "malformed txn running info";
+ msg = ss.str();
+ ss << " key=" << hex(k);
+ LOG(WARNING) << ss.str();
+ return;
+ }
+ const auto& coordinate = info_pb.coordinator();
+ bool matches = info_pb.status() ==
TxnStatusPB::TXN_STATUS_PREPARED &&
+ coordinate.sourcetype() == TXN_SOURCE_TYPE_BE &&
+ coordinate.ip() == request->ip() &&
+ (coordinate.id() == 0 || coordinate.id() ==
request->id());
+ if (matches && has_start_time_filter) {
+ matches = coordinate.start_time() < request->start_time();
+ }
+
+ if (matches) {
+ TxnInfoPB* txn_info = response->add_txn_infos();
+ txn_info->CopyFrom(info_pb);
+ result_count++;
+ }
+
+ if (!it->has_next()) {
+ begin_info_key = k;
+ }
+ }
+ begin_info_key.push_back('\x00'); // Update to next smallest key for
iteration
+ } while (it->more());
+
+ LOG(INFO) << "get_prepare_txn_by_coordinator: found " << result_count << "
transactions"
+ << " total iteration count: " << total_iteration_cnt;
+}
+
std::string get_txn_info_key_from_txn_running_key(std::string_view
txn_running_key) {
std::string conflict_txn_info_key;
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index e93961a8091..f9197fd5d42 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -2742,6 +2742,115 @@ TEST(MetaServiceTest, AbortTxnWithCoordinatorTest) {
ASSERT_EQ(check_txn_conflict_res.conflict_txns_size(), 0);
}
+TEST(MetaServiceTest, GetPrepareTxnByCoordinatorTest) {
+ auto meta_service = get_meta_service();
+
+ const int64_t db_id = 888;
+ const int64_t table_id = 999;
+ const std::string cloud_unique_id = "test_cloud_unique_id";
+ const int64_t coordinator_id = 12345;
+ int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ std::string host = "127.0.0.1:9050";
+
+ // Create multiple transactions with the same coordinator
+ std::vector<int64_t> txn_ids;
+ for (int i = 0; i < 5; ++i) {
+ brpc::Controller begin_txn_cntl;
+ BeginTxnRequest begin_txn_req;
+ BeginTxnResponse begin_txn_res;
+ TxnInfoPB txn_info_pb;
+ TxnCoordinatorPB coordinator;
+
+ begin_txn_req.set_cloud_unique_id(cloud_unique_id);
+ txn_info_pb.set_db_id(db_id);
+ txn_info_pb.set_label("test_label_" + std::to_string(i));
+ txn_info_pb.add_table_ids(table_id);
+ txn_info_pb.set_timeout_ms(36000);
+ coordinator.set_id(coordinator_id);
+ coordinator.set_ip(host);
+
coordinator.set_sourcetype(::doris::cloud::TxnSourceTypePB::TXN_SOURCE_TYPE_BE);
+ coordinator.set_start_time(cur_time);
+ txn_info_pb.mutable_coordinator()->CopyFrom(coordinator);
+ begin_txn_req.mutable_txn_info()->CopyFrom(txn_info_pb);
+
+ meta_service->begin_txn(
+
reinterpret_cast<::google::protobuf::RpcController*>(&begin_txn_cntl),
+ &begin_txn_req, &begin_txn_res, nullptr);
+ ASSERT_EQ(begin_txn_res.status().code(), MetaServiceCode::OK);
+ txn_ids.push_back(begin_txn_res.txn_id());
+ }
+
+ // Test 1: Get all prepared transactions without limit
+ {
+ brpc::Controller cntl;
+ GetPrepareTxnByCoordinatorRequest req;
+ GetPrepareTxnByCoordinatorResponse resp;
+
+ req.set_cloud_unique_id(cloud_unique_id);
+ req.set_id(coordinator_id);
+ req.set_ip(host);
+
+ meta_service->get_prepare_txn_by_coordinator(
+ reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &resp, nullptr);
+
+ ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(resp.txn_infos_size(), 5);
+ }
+
+ // Test 2: Get prepared transactions with start_time filter
+ {
+ brpc::Controller cntl;
+ GetPrepareTxnByCoordinatorRequest req;
+ GetPrepareTxnByCoordinatorResponse resp;
+
+ req.set_cloud_unique_id(cloud_unique_id);
+ req.set_id(coordinator_id);
+ req.set_ip(host);
+ req.set_start_time(cur_time + 100); // Future time, should match all
transactions
+
+ meta_service->get_prepare_txn_by_coordinator(
+ reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &resp, nullptr);
+
+ ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(resp.txn_infos_size(), 5);
+ }
+
+ // Test 3: Get prepared transactions with invalid coordinator
+ {
+ brpc::Controller cntl;
+ GetPrepareTxnByCoordinatorRequest req;
+ GetPrepareTxnByCoordinatorResponse resp;
+
+ req.set_cloud_unique_id(cloud_unique_id);
+ req.set_id(99999); // Non-existent coordinator
+ req.set_ip("192.168.1.1:9999");
+
+ meta_service->get_prepare_txn_by_coordinator(
+ reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &resp, nullptr);
+
+ ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(resp.txn_infos_size(), 0); // Should return empty list
+ }
+
+ // Test 4: Invalid request without coordinator id
+ {
+ brpc::Controller cntl;
+ GetPrepareTxnByCoordinatorRequest req;
+ GetPrepareTxnByCoordinatorResponse resp;
+
+ req.set_cloud_unique_id(cloud_unique_id);
+ req.set_ip(host);
+ // Missing coordinator id
+
+ meta_service->get_prepare_txn_by_coordinator(
+ reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &resp, nullptr);
+
+ ASSERT_EQ(resp.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+ }
+}
+
TEST(MetaServiceTest, CheckTxnConflictTest) {
auto meta_service = get_meta_service();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
index 3f0fe453f96..a27a823c7c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
@@ -486,6 +486,19 @@ public class MetaServiceClient {
.abortTxnWithCoordinator(request);
}
+ public Cloud.GetPrepareTxnByCoordinatorResponse
+ getPrepareTxnByCoordinator(Cloud.GetPrepareTxnByCoordinatorRequest
request) {
+ if (!request.hasCloudUniqueId()) {
+ Cloud.GetPrepareTxnByCoordinatorRequest.Builder builder =
+ Cloud.GetPrepareTxnByCoordinatorRequest.newBuilder();
+ builder.mergeFrom(request);
+ return
blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms,
TimeUnit.MILLISECONDS)
+
.getPrepareTxnByCoordinator(builder.setCloudUniqueId(Config.cloud_unique_id).build());
+ }
+ return
blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms,
TimeUnit.MILLISECONDS)
+ .getPrepareTxnByCoordinator(request);
+ }
+
public Cloud.FinishTabletJobResponse
finishTabletJob(Cloud.FinishTabletJobRequest request) {
if (!request.hasCloudUniqueId()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index 567d99c1939..60bef07b638 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -465,6 +465,11 @@ public class MetaServiceProxy {
return w.executeRequest((client) ->
client.abortTxnWithCoordinator(request));
}
+ public Cloud.GetPrepareTxnByCoordinatorResponse
+ getPrepareTxnByCoordinator(Cloud.GetPrepareTxnByCoordinatorRequest
request) throws RpcException {
+ return w.executeRequest((client) ->
client.getPrepareTxnByCoordinator(request));
+ }
+
public Cloud.CreateInstanceResponse
createInstance(Cloud.CreateInstanceRequest request) throws RpcException {
return w.executeRequest((client) -> client.createInstance(request));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index c791ccdd27e..b1ccee66f51 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -32,8 +32,6 @@ import org.apache.doris.cloud.proto.Cloud.AbortSubTxnRequest;
import org.apache.doris.cloud.proto.Cloud.AbortSubTxnResponse;
import org.apache.doris.cloud.proto.Cloud.AbortTxnRequest;
import org.apache.doris.cloud.proto.Cloud.AbortTxnResponse;
-import org.apache.doris.cloud.proto.Cloud.AbortTxnWithCoordinatorRequest;
-import org.apache.doris.cloud.proto.Cloud.AbortTxnWithCoordinatorResponse;
import org.apache.doris.cloud.proto.Cloud.BeginSubTxnRequest;
import org.apache.doris.cloud.proto.Cloud.BeginSubTxnResponse;
import org.apache.doris.cloud.proto.Cloud.BeginTxnRequest;
@@ -48,6 +46,8 @@ import
org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnRequest;
import org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnResponse;
import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockRequest;
import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockResponse;
+import org.apache.doris.cloud.proto.Cloud.GetPrepareTxnByCoordinatorRequest;
+import org.apache.doris.cloud.proto.Cloud.GetPrepareTxnByCoordinatorResponse;
import org.apache.doris.cloud.proto.Cloud.GetTxnIdRequest;
import org.apache.doris.cloud.proto.Cloud.GetTxnIdResponse;
import org.apache.doris.cloud.proto.Cloud.GetTxnRequest;
@@ -2132,48 +2132,69 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
// do nothing in cloud mode
}
- @Override
- public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String
coordinateHost, long beStartTime) {
- AbortTxnWithCoordinatorRequest.Builder builder =
AbortTxnWithCoordinatorRequest.newBuilder()
+ private List<Pair<Long, Long>> getPrepareTransactionIdByCoordinateBe(long
coordinateBeId,
+ String coordinateHost, long beStartTime) {
+ List<Pair<Long, Long>> txnInfos = new ArrayList<>();
+ GetPrepareTxnByCoordinatorRequest.Builder builder =
GetPrepareTxnByCoordinatorRequest.newBuilder()
.setRequestIp(FrontendOptions.getLocalHostAddressCached());
builder.setIp(coordinateHost);
builder.setId(coordinateBeId);
- builder.setStartTime(beStartTime);
- final AbortTxnWithCoordinatorRequest request = builder.build();
- AbortTxnWithCoordinatorResponse response = null;
+ if (beStartTime > 0) {
+ builder.setStartTime(beStartTime);
+ }
+ builder.setCloudUniqueId(Config.cloud_unique_id);
+
+ final GetPrepareTxnByCoordinatorRequest request = builder.build();
+ GetPrepareTxnByCoordinatorResponse response = null;
try {
- response = MetaServiceProxy
- .getInstance().abortTxnWithCoordinator(request);
- LOG.info("AbortTxnWithCoordinatorResponse: {}", response);
- if
(DebugPointUtil.isEnable("FE.abortTxnWhenCoordinateBeRestart.slow")) {
- LOG.info("debug point FE.abortTxnWhenCoordinateBeRestart.slow
enabled, sleep 15s");
- try {
- Thread.sleep(15 * 1000);
- } catch (InterruptedException ie) {
- LOG.info("error ", ie);
+ response =
MetaServiceProxy.getInstance().getPrepareTxnByCoordinator(request);
+ if (response.getStatus().getCode() == MetaServiceCode.OK) {
+ for (TxnInfoPB txnInfo : response.getTxnInfosList()) {
+ txnInfos.add(Pair.of(txnInfo.getDbId(),
txnInfo.getTxnId()));
}
+ } else {
+ LOG.warn("Get prepare txn by coordinator BE {} failed,
code={}, msg={}",
+ coordinateHost, response.getStatus().getCode(),
response.getStatus().getMsg());
}
} catch (RpcException e) {
- LOG.warn("Abort txn on coordinate BE {} failed, msg={}",
coordinateHost, e.getMessage());
+ LOG.warn("Get prepare txn by coordinator BE {} failed, msg={}",
coordinateHost, e.getMessage());
+ }
+ return txnInfos;
+ }
+
+ private void abortTransactionsByCoordinateBe(
+ List<Pair<Long, Long>> transactions, String coordinateHost,
String reason) {
+ for (Pair<Long, Long> txnInfo : transactions) {
+ try {
+ abortTransaction(txnInfo.first, txnInfo.second, reason);
+ } catch (UserException e) {
+ LOG.warn("Abort txn on coordinate BE {} failed, msg={}",
coordinateHost, e.getMessage());
+ }
}
}
@Override
- public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String
coordinateHost, int limit) {
- AbortTxnWithCoordinatorRequest.Builder builder =
AbortTxnWithCoordinatorRequest.newBuilder();
- builder.setIp(coordinateHost);
- builder.setId(coordinateBeId);
- final AbortTxnWithCoordinatorRequest request = builder.build();
- AbortTxnWithCoordinatorResponse response = null;
- try {
- response = MetaServiceProxy
- .getInstance().abortTxnWithCoordinator(request);
- LOG.info("AbortTxnWithCoordinatorResponse: {}", response);
- } catch (RpcException e) {
- LOG.warn("Abort txn on coordinate BE {} failed, msg={}",
coordinateHost, e.getMessage());
+ public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String
coordinateHost, long beStartTime) {
+ List<Pair<Long, Long>> transactionIdByCoordinateBe
+ = getPrepareTransactionIdByCoordinateBe(coordinateBeId,
coordinateHost, beStartTime);
+ abortTransactionsByCoordinateBe(transactionIdByCoordinateBe,
coordinateHost, "coordinate BE restart");
+ if
(DebugPointUtil.isEnable("FE.abortTxnWhenCoordinateBeRestart.slow")) {
+ LOG.info("debug point FE.abortTxnWhenCoordinateBeRestart.slow
enabled, sleep 15s");
+ try {
+ Thread.sleep(15 * 1000);
+ } catch (InterruptedException ie) {
+ LOG.info("error ", ie);
+ }
}
}
+ @Override
+ public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String
coordinateHost, int limit) {
+ List<Pair<Long, Long>> transactionIdByCoordinateBe
+ = getPrepareTransactionIdByCoordinateBe(coordinateBeId,
coordinateHost, 0);
+ abortTransactionsByCoordinateBe(transactionIdByCoordinateBe,
coordinateHost, "coordinate BE is down");
+ }
+
@Override
public TransactionStatus getLabelState(long dbId, String label) throws
AnalysisException {
GetTxnRequest.Builder builder = GetTxnRequest.newBuilder()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index aa8c2ccc63c..9ed0ddf4c38 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -28,8 +28,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.service.ExecuteEnv;
-import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TransactionState;
@@ -205,11 +204,17 @@ public abstract class RoutineLoadTaskInfo {
// begin a txn for task
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
try {
+ TxnCoordinator coordinator;
+ Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
+ if (backend != null) {
+ long startTime = backend.getLastStartTime();
+ coordinator = new TxnCoordinator(TxnSourceType.BE, beId,
backend.getHost(), startTime);
+ } else {
+ throw new UserException("Backend not found for beId: " + beId);
+ }
txnId =
Env.getCurrentGlobalTransactionMgr().beginTransaction(routineLoadJob.getDbId(),
Lists.newArrayList(routineLoadJob.getTableId()),
DebugUtil.printId(id), null,
- new TxnCoordinator(TxnSourceType.FE, 0,
- FrontendOptions.getLocalHostAddress(),
- ExecuteEnv.getInstance().getStartupTime()),
+ coordinator,
TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK,
routineLoadJob.getId(),
timeoutMs / 1000);
} catch (DuplicatedRequestException e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 077db66712a..4500ce6da0a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -193,9 +193,18 @@ public class HeartbeatMgr extends MasterDaemon {
if (be != null) {
long oldStartTime = be.getLastStartTime();
boolean isChanged = be.handleHbResponse(hbResponse,
isReplay);
+ if
(DebugPointUtil.isEnable("HeartbeatMgr.abortTxnWhenCoordinateBeDown")) {
+ submitAbortTxnTaskByExecutor(() ->
Env.getCurrentGlobalTransactionMgr()
+ .abortTxnWhenCoordinateBeDown(be.getId(),
be.getHost(), 100), "down");
+ }
if (hbResponse.getStatus() == HbStatus.OK) {
long newStartTime = be.getLastStartTime();
// oldStartTime > 0 means it is not the first heartbeat
+ if
(DebugPointUtil.isEnable("FE.abortTxnWhenCoordinateBeRestart")) {
+ submitAbortTxnTaskByExecutor(() ->
Env.getCurrentGlobalTransactionMgr()
+
.abortTxnWhenCoordinateBeRestart(be.getId(), be.getHost(), newStartTime + 1000),
+ "restart");
+ }
if (!isReplay &&
Config.enable_abort_txn_by_checking_coordinator_be
&& oldStartTime != newStartTime &&
oldStartTime > 0) {
submitAbortTxnTaskByExecutor(() ->
Env.getCurrentGlobalTransactionMgr()
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index ba0fbd7cc53..08904998a3c 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1107,6 +1107,19 @@ message AbortTxnWithCoordinatorResponse {
optional MetaServiceResponseStatus status = 1;
}
+message GetPrepareTxnByCoordinatorRequest {
+ optional string cloud_unique_id = 1; // For auth
+ optional string ip = 2;
+ optional int64 id = 3;
+ optional int64 start_time = 4;
+ optional string request_ip = 5;
+}
+
+message GetPrepareTxnByCoordinatorResponse {
+ optional MetaServiceResponseStatus status = 1;
+ repeated TxnInfoPB txn_infos = 2;
+}
+
message CheckTxnConflictRequest {
optional string cloud_unique_id = 1; // For auth
optional int64 db_id = 2;
@@ -2203,6 +2216,7 @@ service MetaService {
rpc begin_sub_txn(BeginSubTxnRequest) returns (BeginSubTxnResponse);
rpc abort_sub_txn(AbortSubTxnRequest) returns (AbortSubTxnResponse);
rpc abort_txn_with_coordinator(AbortTxnWithCoordinatorRequest) returns
(AbortTxnWithCoordinatorResponse);
+ rpc get_prepare_txn_by_coordinator(GetPrepareTxnByCoordinatorRequest)
returns (GetPrepareTxnByCoordinatorResponse);
rpc get_version(GetVersionRequest) returns (GetVersionResponse);
rpc create_tablets(CreateTabletsRequest) returns (CreateTabletsResponse);
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
index 9a5e27d2680..a6120241a5e 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
@@ -129,7 +129,7 @@ class RoutineLoadTestUtils {
return count
}
- static void waitForTaskAbort(Closure sqlRunner, String job, int
maxAttempts = 60) {
+ static void waitForTaskAbort(Closure sqlRunner, String job, int
maxAttempts = 60, int expectedAbortedTaskNum = 1) {
def count = 0
while (true) {
def res = sqlRunner.call("show routine load for ${job}")
@@ -137,7 +137,7 @@ class RoutineLoadTestUtils {
logger.info("Routine load statistic: ${statistic}")
def jsonSlurper = new JsonSlurper()
def json = jsonSlurper.parseText(res[0][14])
- if (json.abortedTaskNum > 1) {
+ if (json.abortedTaskNum > expectedAbortedTaskNum) {
break
}
if (count > maxAttempts) {
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_be_restart.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_be_restart.groovy
new file mode 100644
index 00000000000..74e93dc22bd
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_be_restart.groovy
@@ -0,0 +1,98 @@
+ // Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for
+// the specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_be_restart","nonConcurrent") {
+ def kafkaCsvTopics = [
+ "test_routine_load_be_restart",
+ ]
+
+ if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+ def runSql = { String q -> sql q }
+ def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+ def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+ def tableName = "test_routine_load_be_restart"
+ def job = "test_routine_load_be_restart"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${job} ON ${tableName}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES
+ (
+ "max_batch_interval" = "10"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaCsvTopics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ def injection_load_hang = "load.commit_timeout"
+ def injection_abort_txn = "FE.abortTxnWhenCoordinateBeRestart"
+ // test coordinator be restart
+ try {
+ GetDebugPoint().enableDebugPointForAllFEs(injection_load_hang)
+ RoutineLoadTestUtils.sendTestDataToKafka(producer,
kafkaCsvTopics)
+ GetDebugPoint().enableDebugPointForAllFEs(injection_abort_txn)
+ RoutineLoadTestUtils.waitForTaskAbort(runSql, job, 60)
+ } finally {
+ GetDebugPoint().disableDebugPointForAllFEs(injection_abort_txn)
+ GetDebugPoint().disableDebugPointForAllFEs(injection_load_hang)
+ }
+
+ // test coordinator be down
+ def injection_abort_txn_be_down =
"HeartbeatMgr.abortTxnWhenCoordinateBeDown"
+ try {
+ GetDebugPoint().enableDebugPointForAllFEs(injection_load_hang)
+ RoutineLoadTestUtils.sendTestDataToKafka(producer,
kafkaCsvTopics)
+
GetDebugPoint().enableDebugPointForAllFEs(injection_abort_txn_be_down)
+ RoutineLoadTestUtils.waitForTaskAbort(runSql, job, 60, 2)
+ } finally {
+
GetDebugPoint().disableDebugPointForAllFEs(injection_abort_txn_be_down)
+ GetDebugPoint().disableDebugPointForAllFEs(injection_load_hang)
+ }
+ def count = RoutineLoadTestUtils.waitForTaskFinish(runSql, job,
tableName, 0)
+ logger.info("wait count: " + count)
+ } finally {
+ sql "stop routine load for ${job}"
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]