This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new e3c1aff1320 branch-4.0: [fix](transaction) ensure executing 
before/after abort when abort transaction in cloud mode #59425 (#59907)
e3c1aff1320 is described below

commit e3c1aff1320a17eff9d1ea5f4a6fa0e702224872
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jan 16 10:57:49 2026 +0800

    branch-4.0: [fix](transaction) ensure executing before/after abort when 
abort transaction in cloud mode #59425 (#59907)
    
    Cherry-picked from #59425
    
    Co-authored-by: hui lai <[email protected]>
---
 cloud/src/common/bvars.cpp                         |   5 +
 cloud/src/common/bvars.h                           |   3 +
 cloud/src/meta-service/meta_service.h              |  13 +++
 cloud/src/meta-service/meta_service_txn.cpp        |  85 ++++++++++++++++
 cloud/test/meta_service_test.cpp                   | 109 +++++++++++++++++++++
 .../apache/doris/cloud/rpc/MetaServiceClient.java  |  13 +++
 .../apache/doris/cloud/rpc/MetaServiceProxy.java   |   5 +
 .../transaction/CloudGlobalTransactionMgr.java     |  81 +++++++++------
 .../load/routineload/RoutineLoadTaskInfo.java      |  15 ++-
 .../java/org/apache/doris/system/HeartbeatMgr.java |   9 ++
 gensrc/proto/cloud.proto                           |  14 +++
 .../regression/util/RoutineLoadTestUtils.groovy    |   4 +-
 .../test_routine_load_be_restart.groovy            |  98 ++++++++++++++++++
 13 files changed, 417 insertions(+), 37 deletions(-)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index e35e5e677f6..6c3ba1eee50 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -43,6 +43,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn("ms", 
"begin_sub_txn");
 BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn("ms", "abort_sub_txn");
 BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict("ms", 
"check_txn_conflict");
 BvarLatencyRecorderWithTag g_bvar_ms_abort_txn_with_coordinator("ms", 
"abort_txn_with_coordinator");
+BvarLatencyRecorderWithTag g_bvar_ms_get_prepare_txn_by_coordinator("ms", 
"get_prepare_txn_by_coordinator");
 BvarLatencyRecorderWithTag g_bvar_ms_clean_txn_label("ms", "clean_txn_label");
 BvarLatencyRecorderWithTag g_bvar_ms_get_version("ms", "get_version");
 BvarLatencyRecorderWithTag g_bvar_ms_batch_get_version("ms", 
"batch_get_version");
@@ -435,6 +436,8 @@ mBvarInt64Adder 
g_bvar_rpc_kv_abort_sub_txn_get_counter("rpc_kv_abort_sub_txn_ge
 mBvarInt64Adder 
g_bvar_rpc_kv_abort_sub_txn_put_counter("rpc_kv_abort_sub_txn_put_counter",{"instance_id"});
 // abort_txn_with_coordinator
 mBvarInt64Adder 
g_bvar_rpc_kv_abort_txn_with_coordinator_get_counter("rpc_kv_abort_txn_with_coordinator_get_counter",{"instance_id"});
+// get_prepare_txn_by_coordinator
+mBvarInt64Adder 
g_bvar_rpc_kv_get_prepare_txn_by_coordinator_get_counter("rpc_kv_get_prepare_txn_by_coordinator_get_counter",{"instance_id"});
 // check_txn_conflict
 mBvarInt64Adder 
g_bvar_rpc_kv_check_txn_conflict_get_counter("rpc_kv_check_txn_conflict_get_counter",{"instance_id"});
 // clean_txn_label
@@ -635,6 +638,8 @@ mBvarInt64Adder 
g_bvar_rpc_kv_abort_sub_txn_get_bytes("rpc_kv_abort_sub_txn_get_
 mBvarInt64Adder 
g_bvar_rpc_kv_abort_sub_txn_put_bytes("rpc_kv_abort_sub_txn_put_bytes",{"instance_id"});
 // abort_txn_with_coordinator
 mBvarInt64Adder 
g_bvar_rpc_kv_abort_txn_with_coordinator_get_bytes("rpc_kv_abort_txn_with_coordinator_get_bytes",{"instance_id"});
+// get_prepare_txn_by_coordinator
+mBvarInt64Adder 
g_bvar_rpc_kv_get_prepare_txn_by_coordinator_get_bytes("rpc_kv_get_prepare_txn_by_coordinator_get_bytes",{"instance_id"});
 // check_txn_conflict
 mBvarInt64Adder 
g_bvar_rpc_kv_check_txn_conflict_get_bytes("rpc_kv_check_txn_conflict_get_bytes",{"instance_id"});
 // clean_txn_label
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 25e324b3233..798f8f82433 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -553,6 +553,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id;
 extern BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict;
 extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn_with_coordinator;
+extern BvarLatencyRecorderWithTag g_bvar_ms_get_prepare_txn_by_coordinator;
 extern BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn;
 extern BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn;
 extern BvarLatencyRecorderWithTag g_bvar_ms_clean_txn_label;
@@ -869,6 +870,7 @@ extern mBvarInt64Adder 
g_bvar_rpc_kv_begin_sub_txn_del_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_abort_sub_txn_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_abort_sub_txn_put_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_with_coordinator_get_counter;
+extern mBvarInt64Adder 
g_bvar_rpc_kv_get_prepare_txn_by_coordinator_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_check_txn_conflict_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_clean_txn_label_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_clean_txn_label_put_counter;
@@ -1007,6 +1009,7 @@ extern mBvarInt64Adder 
g_bvar_rpc_kv_begin_sub_txn_del_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_abort_sub_txn_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_abort_sub_txn_put_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_with_coordinator_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_get_prepare_txn_by_coordinator_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_check_txn_conflict_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_clean_txn_label_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_clean_txn_label_put_bytes;
diff --git a/cloud/src/meta-service/meta_service.h 
b/cloud/src/meta-service/meta_service.h
index c801d438faf..e6747f6326d 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -144,6 +144,11 @@ public:
                                     AbortTxnWithCoordinatorResponse* response,
                                     ::google::protobuf::Closure* done) 
override;
 
+    void get_prepare_txn_by_coordinator(::google::protobuf::RpcController* 
controller,
+                                        const 
GetPrepareTxnByCoordinatorRequest* request,
+                                        GetPrepareTxnByCoordinatorResponse* 
response,
+                                        ::google::protobuf::Closure* done) 
override;
+
     void clean_txn_label(::google::protobuf::RpcController* controller,
                          const CleanTxnLabelRequest* request, 
CleanTxnLabelResponse* response,
                          ::google::protobuf::Closure* done) override;
@@ -592,6 +597,14 @@ public:
                   done);
     }
 
+    void get_prepare_txn_by_coordinator(::google::protobuf::RpcController* 
controller,
+                                        const 
GetPrepareTxnByCoordinatorRequest* request,
+                                        GetPrepareTxnByCoordinatorResponse* 
response,
+                                        ::google::protobuf::Closure* done) 
override {
+        call_impl(&cloud::MetaService::get_prepare_txn_by_coordinator, 
controller, request,
+                  response, done);
+    }
+
     void clean_txn_label(::google::protobuf::RpcController* controller,
                          const CleanTxnLabelRequest* request, 
CleanTxnLabelResponse* response,
                          ::google::protobuf::Closure* done) override {
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index ab6e2fedfe5..9b1cfc49836 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -3896,6 +3896,91 @@ void 
MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcControll
     }
 }
 
+void MetaServiceImpl::get_prepare_txn_by_coordinator(
+        ::google::protobuf::RpcController* controller,
+        const GetPrepareTxnByCoordinatorRequest* request,
+        GetPrepareTxnByCoordinatorResponse* response, 
::google::protobuf::Closure* done) {
+    RPC_PREPROCESS(get_prepare_txn_by_coordinator, get);
+    if (!request->has_id() || !request->has_ip()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "invalid coordinate id or coordinate ip.";
+        return;
+    }
+    // TODO: For auth
+    std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
+    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "cannot find instance_id with cloud_unique_id="
+           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+        msg = ss.str();
+        return;
+    }
+    RPC_RATE_LIMIT(get_prepare_txn_by_coordinator);
+    std::string begin_info_key = txn_info_key({instance_id, 0, 0});
+    std::string end_info_key = txn_info_key({instance_id, INT64_MAX, 
INT64_MAX});
+    LOG(INFO) << "begin_info_key:" << hex(begin_info_key) << " end_info_key:" 
<< hex(end_info_key);
+
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        msg = "failed to create txn";
+        code = cast_as<ErrCategory::CREATE>(err);
+        return;
+    }
+    std::unique_ptr<RangeGetIterator> it;
+    int32_t result_count = 0;
+    int64_t total_iteration_cnt = 0;
+    bool has_start_time_filter = request->has_start_time();
+
+    do {
+        err = txn->get(begin_info_key, end_info_key, &it, true);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(err);
+            ss << "failed to get txn info. err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
+
+        while (it->has_next()) {
+            total_iteration_cnt++;
+            auto [k, v] = it->next();
+            VLOG_DEBUG << "check txn info txn_info_key=" << hex(k);
+            TxnInfoPB info_pb;
+            if (!info_pb.ParseFromArray(v.data(), v.size())) {
+                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                ss << "malformed txn running info";
+                msg = ss.str();
+                ss << " key=" << hex(k);
+                LOG(WARNING) << ss.str();
+                return;
+            }
+            const auto& coordinate = info_pb.coordinator();
+            bool matches = info_pb.status() == 
TxnStatusPB::TXN_STATUS_PREPARED &&
+                           coordinate.sourcetype() == TXN_SOURCE_TYPE_BE &&
+                           coordinate.ip() == request->ip() &&
+                           (coordinate.id() == 0 || coordinate.id() == 
request->id());
+            if (matches && has_start_time_filter) {
+                matches = coordinate.start_time() < request->start_time();
+            }
+
+            if (matches) {
+                TxnInfoPB* txn_info = response->add_txn_infos();
+                txn_info->CopyFrom(info_pb);
+                result_count++;
+            }
+
+            if (!it->has_next()) {
+                begin_info_key = k;
+            }
+        }
+        begin_info_key.push_back('\x00'); // Update to next smallest key for 
iteration
+    } while (it->more());
+
+    LOG(INFO) << "get_prepare_txn_by_coordinator: found " << result_count << " 
transactions"
+              << " total iteration count: " << total_iteration_cnt;
+}
+
 std::string get_txn_info_key_from_txn_running_key(std::string_view 
txn_running_key) {
     std::string conflict_txn_info_key;
     std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index e93961a8091..f9197fd5d42 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -2742,6 +2742,115 @@ TEST(MetaServiceTest, AbortTxnWithCoordinatorTest) {
     ASSERT_EQ(check_txn_conflict_res.conflict_txns_size(), 0);
 }
 
+TEST(MetaServiceTest, GetPrepareTxnByCoordinatorTest) {
+    auto meta_service = get_meta_service();
+
+    const int64_t db_id = 888;
+    const int64_t table_id = 999;
+    const std::string cloud_unique_id = "test_cloud_unique_id";
+    const int64_t coordinator_id = 12345;
+    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
+                               
std::chrono::steady_clock::now().time_since_epoch())
+                               .count();
+    std::string host = "127.0.0.1:9050";
+
+    // Create multiple transactions with the same coordinator
+    std::vector<int64_t> txn_ids;
+    for (int i = 0; i < 5; ++i) {
+        brpc::Controller begin_txn_cntl;
+        BeginTxnRequest begin_txn_req;
+        BeginTxnResponse begin_txn_res;
+        TxnInfoPB txn_info_pb;
+        TxnCoordinatorPB coordinator;
+
+        begin_txn_req.set_cloud_unique_id(cloud_unique_id);
+        txn_info_pb.set_db_id(db_id);
+        txn_info_pb.set_label("test_label_" + std::to_string(i));
+        txn_info_pb.add_table_ids(table_id);
+        txn_info_pb.set_timeout_ms(36000);
+        coordinator.set_id(coordinator_id);
+        coordinator.set_ip(host);
+        
coordinator.set_sourcetype(::doris::cloud::TxnSourceTypePB::TXN_SOURCE_TYPE_BE);
+        coordinator.set_start_time(cur_time);
+        txn_info_pb.mutable_coordinator()->CopyFrom(coordinator);
+        begin_txn_req.mutable_txn_info()->CopyFrom(txn_info_pb);
+
+        meta_service->begin_txn(
+                
reinterpret_cast<::google::protobuf::RpcController*>(&begin_txn_cntl),
+                &begin_txn_req, &begin_txn_res, nullptr);
+        ASSERT_EQ(begin_txn_res.status().code(), MetaServiceCode::OK);
+        txn_ids.push_back(begin_txn_res.txn_id());
+    }
+
+    // Test 1: Get all prepared transactions without limit
+    {
+        brpc::Controller cntl;
+        GetPrepareTxnByCoordinatorRequest req;
+        GetPrepareTxnByCoordinatorResponse resp;
+
+        req.set_cloud_unique_id(cloud_unique_id);
+        req.set_id(coordinator_id);
+        req.set_ip(host);
+
+        meta_service->get_prepare_txn_by_coordinator(
+                reinterpret_cast<::google::protobuf::RpcController*>(&cntl), 
&req, &resp, nullptr);
+
+        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
+        ASSERT_EQ(resp.txn_infos_size(), 5);
+    }
+
+    // Test 2: Get prepared transactions with start_time filter
+    {
+        brpc::Controller cntl;
+        GetPrepareTxnByCoordinatorRequest req;
+        GetPrepareTxnByCoordinatorResponse resp;
+
+        req.set_cloud_unique_id(cloud_unique_id);
+        req.set_id(coordinator_id);
+        req.set_ip(host);
+        req.set_start_time(cur_time + 100); // Future time, should match all 
transactions
+
+        meta_service->get_prepare_txn_by_coordinator(
+                reinterpret_cast<::google::protobuf::RpcController*>(&cntl), 
&req, &resp, nullptr);
+
+        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
+        ASSERT_EQ(resp.txn_infos_size(), 5);
+    }
+
+    // Test 3: Get prepared transactions with invalid coordinator
+    {
+        brpc::Controller cntl;
+        GetPrepareTxnByCoordinatorRequest req;
+        GetPrepareTxnByCoordinatorResponse resp;
+
+        req.set_cloud_unique_id(cloud_unique_id);
+        req.set_id(99999); // Non-existent coordinator
+        req.set_ip("192.168.1.1:9999");
+
+        meta_service->get_prepare_txn_by_coordinator(
+                reinterpret_cast<::google::protobuf::RpcController*>(&cntl), 
&req, &resp, nullptr);
+
+        ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
+        ASSERT_EQ(resp.txn_infos_size(), 0); // Should return empty list
+    }
+
+    // Test 4: Invalid request without coordinator id
+    {
+        brpc::Controller cntl;
+        GetPrepareTxnByCoordinatorRequest req;
+        GetPrepareTxnByCoordinatorResponse resp;
+
+        req.set_cloud_unique_id(cloud_unique_id);
+        req.set_ip(host);
+        // Missing coordinator id
+
+        meta_service->get_prepare_txn_by_coordinator(
+                reinterpret_cast<::google::protobuf::RpcController*>(&cntl), 
&req, &resp, nullptr);
+
+        ASSERT_EQ(resp.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+    }
+}
+
 TEST(MetaServiceTest, CheckTxnConflictTest) {
     auto meta_service = get_meta_service();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
index 3f0fe453f96..a27a823c7c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
@@ -486,6 +486,19 @@ public class MetaServiceClient {
                 .abortTxnWithCoordinator(request);
     }
 
+    public Cloud.GetPrepareTxnByCoordinatorResponse
+            getPrepareTxnByCoordinator(Cloud.GetPrepareTxnByCoordinatorRequest 
request) {
+        if (!request.hasCloudUniqueId()) {
+            Cloud.GetPrepareTxnByCoordinatorRequest.Builder builder =
+                    Cloud.GetPrepareTxnByCoordinatorRequest.newBuilder();
+            builder.mergeFrom(request);
+            return 
blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms, 
TimeUnit.MILLISECONDS)
+                    
.getPrepareTxnByCoordinator(builder.setCloudUniqueId(Config.cloud_unique_id).build());
+        }
+        return 
blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms, 
TimeUnit.MILLISECONDS)
+                .getPrepareTxnByCoordinator(request);
+    }
+
     public Cloud.FinishTabletJobResponse
             finishTabletJob(Cloud.FinishTabletJobRequest request) {
         if (!request.hasCloudUniqueId()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index 567d99c1939..60bef07b638 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -465,6 +465,11 @@ public class MetaServiceProxy {
         return w.executeRequest((client) -> 
client.abortTxnWithCoordinator(request));
     }
 
+    public Cloud.GetPrepareTxnByCoordinatorResponse
+            getPrepareTxnByCoordinator(Cloud.GetPrepareTxnByCoordinatorRequest 
request) throws RpcException {
+        return w.executeRequest((client) -> 
client.getPrepareTxnByCoordinator(request));
+    }
+
     public Cloud.CreateInstanceResponse 
createInstance(Cloud.CreateInstanceRequest request) throws RpcException {
         return w.executeRequest((client) -> client.createInstance(request));
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index c791ccdd27e..b1ccee66f51 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -32,8 +32,6 @@ import org.apache.doris.cloud.proto.Cloud.AbortSubTxnRequest;
 import org.apache.doris.cloud.proto.Cloud.AbortSubTxnResponse;
 import org.apache.doris.cloud.proto.Cloud.AbortTxnRequest;
 import org.apache.doris.cloud.proto.Cloud.AbortTxnResponse;
-import org.apache.doris.cloud.proto.Cloud.AbortTxnWithCoordinatorRequest;
-import org.apache.doris.cloud.proto.Cloud.AbortTxnWithCoordinatorResponse;
 import org.apache.doris.cloud.proto.Cloud.BeginSubTxnRequest;
 import org.apache.doris.cloud.proto.Cloud.BeginSubTxnResponse;
 import org.apache.doris.cloud.proto.Cloud.BeginTxnRequest;
@@ -48,6 +46,8 @@ import 
org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnRequest;
 import org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnResponse;
 import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockRequest;
 import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockResponse;
+import org.apache.doris.cloud.proto.Cloud.GetPrepareTxnByCoordinatorRequest;
+import org.apache.doris.cloud.proto.Cloud.GetPrepareTxnByCoordinatorResponse;
 import org.apache.doris.cloud.proto.Cloud.GetTxnIdRequest;
 import org.apache.doris.cloud.proto.Cloud.GetTxnIdResponse;
 import org.apache.doris.cloud.proto.Cloud.GetTxnRequest;
@@ -2132,48 +2132,69 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         // do nothing in cloud mode
     }
 
-    @Override
-    public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String 
coordinateHost, long beStartTime) {
-        AbortTxnWithCoordinatorRequest.Builder builder = 
AbortTxnWithCoordinatorRequest.newBuilder()
+    private List<Pair<Long, Long>> getPrepareTransactionIdByCoordinateBe(long 
coordinateBeId,
+            String coordinateHost, long beStartTime) {
+        List<Pair<Long, Long>> txnInfos = new ArrayList<>();
+        GetPrepareTxnByCoordinatorRequest.Builder builder = 
GetPrepareTxnByCoordinatorRequest.newBuilder()
                 .setRequestIp(FrontendOptions.getLocalHostAddressCached());
         builder.setIp(coordinateHost);
         builder.setId(coordinateBeId);
-        builder.setStartTime(beStartTime);
-        final AbortTxnWithCoordinatorRequest request = builder.build();
-        AbortTxnWithCoordinatorResponse response = null;
+        if (beStartTime > 0) {
+            builder.setStartTime(beStartTime);
+        }
+        builder.setCloudUniqueId(Config.cloud_unique_id);
+
+        final GetPrepareTxnByCoordinatorRequest request = builder.build();
+        GetPrepareTxnByCoordinatorResponse response = null;
         try {
-            response = MetaServiceProxy
-                .getInstance().abortTxnWithCoordinator(request);
-            LOG.info("AbortTxnWithCoordinatorResponse: {}", response);
-            if 
(DebugPointUtil.isEnable("FE.abortTxnWhenCoordinateBeRestart.slow")) {
-                LOG.info("debug point FE.abortTxnWhenCoordinateBeRestart.slow 
enabled, sleep 15s");
-                try {
-                    Thread.sleep(15 * 1000);
-                } catch (InterruptedException ie) {
-                    LOG.info("error ", ie);
+            response = 
MetaServiceProxy.getInstance().getPrepareTxnByCoordinator(request);
+            if (response.getStatus().getCode() == MetaServiceCode.OK) {
+                for (TxnInfoPB txnInfo : response.getTxnInfosList()) {
+                    txnInfos.add(Pair.of(txnInfo.getDbId(), 
txnInfo.getTxnId()));
                 }
+            } else {
+                LOG.warn("Get prepare txn by coordinator BE {} failed, 
code={}, msg={}",
+                        coordinateHost, response.getStatus().getCode(), 
response.getStatus().getMsg());
             }
         } catch (RpcException e) {
-            LOG.warn("Abort txn on coordinate BE {} failed, msg={}", 
coordinateHost, e.getMessage());
+            LOG.warn("Get prepare txn by coordinator BE {} failed, msg={}", 
coordinateHost, e.getMessage());
+        }
+        return txnInfos;
+    }
+
+    private void abortTransactionsByCoordinateBe(
+                List<Pair<Long, Long>> transactions, String coordinateHost, 
String reason) {
+        for (Pair<Long, Long> txnInfo : transactions) {
+            try {
+                abortTransaction(txnInfo.first, txnInfo.second, reason);
+            } catch (UserException e) {
+                LOG.warn("Abort txn on coordinate BE {} failed, msg={}", 
coordinateHost, e.getMessage());
+            }
         }
     }
 
     @Override
-    public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String 
coordinateHost, int limit) {
-        AbortTxnWithCoordinatorRequest.Builder builder = 
AbortTxnWithCoordinatorRequest.newBuilder();
-        builder.setIp(coordinateHost);
-        builder.setId(coordinateBeId);
-        final AbortTxnWithCoordinatorRequest request = builder.build();
-        AbortTxnWithCoordinatorResponse response = null;
-        try {
-            response = MetaServiceProxy
-                .getInstance().abortTxnWithCoordinator(request);
-            LOG.info("AbortTxnWithCoordinatorResponse: {}", response);
-        } catch (RpcException e) {
-            LOG.warn("Abort txn on coordinate BE {} failed, msg={}", 
coordinateHost, e.getMessage());
+    public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String 
coordinateHost, long beStartTime) {
+        List<Pair<Long, Long>> transactionIdByCoordinateBe
+                = getPrepareTransactionIdByCoordinateBe(coordinateBeId, 
coordinateHost, beStartTime);
+        abortTransactionsByCoordinateBe(transactionIdByCoordinateBe, 
coordinateHost, "coordinate BE restart");
+        if 
(DebugPointUtil.isEnable("FE.abortTxnWhenCoordinateBeRestart.slow")) {
+            LOG.info("debug point FE.abortTxnWhenCoordinateBeRestart.slow 
enabled, sleep 15s");
+            try {
+                Thread.sleep(15 * 1000);
+            } catch (InterruptedException ie) {
+                LOG.info("error ", ie);
+            }
         }
     }
 
+    @Override
+    public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String 
coordinateHost, int limit) {
+        List<Pair<Long, Long>> transactionIdByCoordinateBe
+                = getPrepareTransactionIdByCoordinateBe(coordinateBeId, 
coordinateHost, 0);
+        abortTransactionsByCoordinateBe(transactionIdByCoordinateBe, 
coordinateHost, "coordinate BE is down");
+    }
+
     @Override
     public TransactionStatus getLabelState(long dbId, String label) throws 
AnalysisException {
         GetTxnRequest.Builder builder = GetTxnRequest.newBuilder()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index aa8c2ccc63c..9ed0ddf4c38 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -28,8 +28,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.service.ExecuteEnv;
-import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TRoutineLoadTask;
 import org.apache.doris.transaction.BeginTransactionException;
 import org.apache.doris.transaction.TransactionState;
@@ -205,11 +204,17 @@ public abstract class RoutineLoadTaskInfo {
         // begin a txn for task
         RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
         try {
+            TxnCoordinator coordinator;
+            Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
+            if (backend != null) {
+                long startTime = backend.getLastStartTime();
+                coordinator = new TxnCoordinator(TxnSourceType.BE, beId, 
backend.getHost(), startTime);
+            } else {
+                throw new UserException("Backend not found for beId: " + beId);
+            }
             txnId = 
Env.getCurrentGlobalTransactionMgr().beginTransaction(routineLoadJob.getDbId(),
                     Lists.newArrayList(routineLoadJob.getTableId()), 
DebugUtil.printId(id), null,
-                    new TxnCoordinator(TxnSourceType.FE, 0,
-                            FrontendOptions.getLocalHostAddress(),
-                            ExecuteEnv.getInstance().getStartupTime()),
+                    coordinator,
                     TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, 
routineLoadJob.getId(),
                     timeoutMs / 1000);
         } catch (DuplicatedRequestException e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 077db66712a..4500ce6da0a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -193,9 +193,18 @@ public class HeartbeatMgr extends MasterDaemon {
                 if (be != null) {
                     long oldStartTime = be.getLastStartTime();
                     boolean isChanged = be.handleHbResponse(hbResponse, 
isReplay);
+                    if 
(DebugPointUtil.isEnable("HeartbeatMgr.abortTxnWhenCoordinateBeDown")) {
+                        submitAbortTxnTaskByExecutor(() -> 
Env.getCurrentGlobalTransactionMgr()
+                                    .abortTxnWhenCoordinateBeDown(be.getId(), 
be.getHost(), 100), "down");
+                    }
                     if (hbResponse.getStatus() == HbStatus.OK) {
                         long newStartTime = be.getLastStartTime();
                         // oldStartTime > 0 means it is not the first heartbeat
+                        if 
(DebugPointUtil.isEnable("FE.abortTxnWhenCoordinateBeRestart")) {
+                            submitAbortTxnTaskByExecutor(() -> 
Env.getCurrentGlobalTransactionMgr()
+                                    
.abortTxnWhenCoordinateBeRestart(be.getId(), be.getHost(), newStartTime + 1000),
+                                    "restart");
+                        }
                         if (!isReplay && 
Config.enable_abort_txn_by_checking_coordinator_be
                                 && oldStartTime != newStartTime && 
oldStartTime > 0) {
                             submitAbortTxnTaskByExecutor(() -> 
Env.getCurrentGlobalTransactionMgr()
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index ba0fbd7cc53..08904998a3c 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1107,6 +1107,19 @@ message AbortTxnWithCoordinatorResponse {
     optional MetaServiceResponseStatus status = 1;
 }
 
+message GetPrepareTxnByCoordinatorRequest {
+    optional string cloud_unique_id = 1; // For auth
+    optional string ip = 2;
+    optional int64 id = 3;
+    optional int64 start_time = 4;
+    optional string request_ip = 5;
+}
+
+message GetPrepareTxnByCoordinatorResponse {
+    optional MetaServiceResponseStatus status = 1;
+    repeated TxnInfoPB txn_infos = 2;
+}
+
 message CheckTxnConflictRequest {
     optional string cloud_unique_id = 1; // For auth
     optional int64 db_id = 2;
@@ -2203,6 +2216,7 @@ service MetaService {
     rpc begin_sub_txn(BeginSubTxnRequest) returns (BeginSubTxnResponse);
     rpc abort_sub_txn(AbortSubTxnRequest) returns (AbortSubTxnResponse);
     rpc abort_txn_with_coordinator(AbortTxnWithCoordinatorRequest) returns 
(AbortTxnWithCoordinatorResponse);
+    rpc get_prepare_txn_by_coordinator(GetPrepareTxnByCoordinatorRequest) 
returns (GetPrepareTxnByCoordinatorResponse);
 
     rpc get_version(GetVersionRequest) returns (GetVersionResponse);
     rpc create_tablets(CreateTabletsRequest) returns (CreateTabletsResponse);
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
index 9a5e27d2680..a6120241a5e 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
@@ -129,7 +129,7 @@ class RoutineLoadTestUtils {
         return count
     }
 
-    static void waitForTaskAbort(Closure sqlRunner, String job, int 
maxAttempts = 60) {
+    static void waitForTaskAbort(Closure sqlRunner, String job, int 
maxAttempts = 60, int expectedAbortedTaskNum = 1) {
         def count = 0
         while (true) {
             def res = sqlRunner.call("show routine load for ${job}")
@@ -137,7 +137,7 @@ class RoutineLoadTestUtils {
             logger.info("Routine load statistic: ${statistic}")
             def jsonSlurper = new JsonSlurper()
             def json = jsonSlurper.parseText(res[0][14])
-            if (json.abortedTaskNum > 1) {
+            if (json.abortedTaskNum > expectedAbortedTaskNum) {
                 break
             }
             if (count > maxAttempts) {
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_be_restart.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_be_restart.groovy
new file mode 100644
index 00000000000..74e93dc22bd
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_be_restart.groovy
@@ -0,0 +1,98 @@
+ // Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for
+// the specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_be_restart","nonConcurrent") {
+    def kafkaCsvTopics = [
+                  "test_routine_load_be_restart",
+                ]
+
+    if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+        def runSql = { String q -> sql q }
+        def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+        def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+        def tableName = "test_routine_load_be_restart"
+        def job = "test_routine_load_be_restart"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k1` int(20) NULL,
+                `k2` string NULL,
+                `v1` date  NULL,
+                `v2` string  NULL,
+                `v3` datetime  NULL,
+                `v4` string  NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${job} ON ${tableName}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaCsvTopics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            def injection_load_hang = "load.commit_timeout"
+            def injection_abort_txn = "FE.abortTxnWhenCoordinateBeRestart"
+            // test coordinator be restart
+            try {
+                GetDebugPoint().enableDebugPointForAllFEs(injection_load_hang)
+                RoutineLoadTestUtils.sendTestDataToKafka(producer, 
kafkaCsvTopics)
+                GetDebugPoint().enableDebugPointForAllFEs(injection_abort_txn)
+                RoutineLoadTestUtils.waitForTaskAbort(runSql, job, 60)
+            } finally {
+                GetDebugPoint().disableDebugPointForAllFEs(injection_abort_txn)
+                GetDebugPoint().disableDebugPointForAllFEs(injection_load_hang)
+            }
+
+            // test coordinator be down
+            def injection_abort_txn_be_down = 
"HeartbeatMgr.abortTxnWhenCoordinateBeDown"
+            try {
+                GetDebugPoint().enableDebugPointForAllFEs(injection_load_hang)
+                RoutineLoadTestUtils.sendTestDataToKafka(producer, 
kafkaCsvTopics)
+                
GetDebugPoint().enableDebugPointForAllFEs(injection_abort_txn_be_down)
+                RoutineLoadTestUtils.waitForTaskAbort(runSql, job, 60, 2)
+            } finally {
+                
GetDebugPoint().disableDebugPointForAllFEs(injection_abort_txn_be_down)
+                GetDebugPoint().disableDebugPointForAllFEs(injection_load_hang)
+            }
+            def count = RoutineLoadTestUtils.waitForTaskFinish(runSql, job, 
tableName, 0)
+            logger.info("wait count: " + count)
+        } finally {
+            sql "stop routine load for ${job}"
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to