This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 815c7e6a58a [fix](txn) Fix coordidator be restart not abort txn 
(#35342)
815c7e6a58a is described below

commit 815c7e6a58ad6dcff00bebd233a10fda6ad45ede
Author: yujun <[email protected]>
AuthorDate: Fri Jun 14 19:41:51 2024 +0800

    [fix](txn) Fix coordidator be restart not abort txn (#35342)
    
    BUG: fe will abort coordidator BE's txn when be shutdown exceeds 5min.
    But if BE restart within 5min, then this BE's txns will not abort until
    timeout.
    
    FIX: every txn will record BE's id and its start time. When fe found
    be's startTime change, it will abort the BE's old txns.
---
 .../runtime/stream_load/stream_load_executor.cpp   |   4 +
 .../apache/doris/analysis/NativeInsertStmt.java    |   5 +-
 .../transaction/CloudGlobalTransactionMgr.java     |   7 +-
 .../apache/doris/cloud/transaction/TxnUtil.java    |   8 +-
 .../org/apache/doris/httpv2/rest/LoadAction.java   |   6 +
 .../main/java/org/apache/doris/load/DeleteJob.java |   6 +-
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |   5 +-
 .../org/apache/doris/load/loadv2/SparkLoadJob.java |   5 +-
 .../load/routineload/RoutineLoadTaskInfo.java      |   5 +-
 .../doris/load/sync/canal/CanalSyncChannel.java    |   7 +-
 .../trees/plans/commands/insert/InsertUtils.java   |   8 +-
 .../plans/commands/insert/OlapInsertExecutor.java  |   5 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |   8 +-
 .../apache/doris/service/FrontendServiceImpl.java  |  29 ++++-
 .../java/org/apache/doris/system/HeartbeatMgr.java |  13 +-
 .../doris/transaction/DatabaseTransactionMgr.java  |   7 +-
 .../doris/transaction/GlobalTransactionMgr.java    |  34 ++++--
 .../transaction/GlobalTransactionMgrIface.java     |   4 +-
 .../apache/doris/transaction/TransactionEntry.java |   5 +-
 .../apache/doris/transaction/TransactionState.java |  15 ++-
 .../transaction/CloudGlobalTransactionMgrTest.java |   3 +-
 .../transaction/DatabaseTransactionMgrTest.java    |   9 +-
 .../transaction/GlobalTransactionMgrTest.java      |  11 +-
 .../doris/transaction/TransactionStateTest.java    |  11 +-
 gensrc/proto/cloud.proto                           |   2 +
 gensrc/thrift/FrontendService.thrift               |   2 +
 .../org/apache/doris/regression/suite/Suite.groovy |  27 +++++
 .../suites/demo_p0/streamLoad_action.groovy        |   5 +
 .../stream_load/test_coordidator_be_restart.groovy | 135 +++++++++++++++++++++
 29 files changed, 337 insertions(+), 54 deletions(-)

diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 58621c77a2a..051aca3e130 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -45,6 +45,7 @@
 #include "runtime/stream_load/new_load_stream_mgr.h"
 #include "runtime/stream_load/stream_load_context.h"
 #include "thrift/protocol/TDebugProtocol.h"
+#include "util/debug_points.h"
 #include "util/doris_metrics.h"
 #include "util/thrift_rpc_helper.h"
 #include "util/time.h"
@@ -174,6 +175,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* 
ctx) {
         request.__set_timeout(ctx->timeout_second);
     }
     request.__set_request_id(ctx->id.to_thrift());
+    request.__set_backend_id(_exec_env->master_info()->backend_id);
 
     TLoadTxnBeginResult result;
     Status status;
@@ -309,6 +311,8 @@ void 
StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx,
 }
 
 Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
+    DBUG_EXECUTE_IF("StreamLoadExecutor.commit_txn.block", DBUG_BLOCK);
+
     
DorisMetrics::instance()->stream_load_txn_commit_request_total->increment(1);
 
     TLoadTxnCommitRequest request;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index c4ef8f6597f..8a3a0573cc7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -59,6 +59,7 @@ import org.apache.doris.planner.OlapTableSink;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SqlModeHelper;
 import org.apache.doris.rewrite.ExprRewriter;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TUniqueId;
@@ -406,7 +407,9 @@ public class NativeInsertStmt extends InsertStmt {
                 LoadJobSourceType sourceType = 
LoadJobSourceType.INSERT_STREAMING;
                 transactionId = 
Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
                         Lists.newArrayList(targetTable.getId()), 
label.getLabelName(),
-                        new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                        new TxnCoordinator(TxnSourceType.FE, 0,
+                                FrontendOptions.getLocalHostAddress(),
+                                ExecuteEnv.getInstance().getStartupTime()),
                         sourceType, timeoutSecond);
             }
             isTransactionBegin = true;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 0718bda3433..297ac9f1746 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -1160,7 +1160,12 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     }
 
     @Override
-    public void abortTxnWhenCoordinateBeDown(String coordinateHost, int limit) 
{
+    public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String 
coordinateHost, long beStartTime) {
+        // do nothing in cloud mode
+    }
+
+    @Override
+    public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String 
coordinateHost, int limit) {
         // do nothing in cloud mode
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
index 5db50a7da89..7e913bd9a93 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
@@ -286,15 +286,15 @@ public class TxnUtil {
     public static TxnCoordinatorPB txnCoordinatorToPb(TxnCoordinator 
txnCoordinator) {
         TxnCoordinatorPB.Builder builder = TxnCoordinatorPB.newBuilder();
         
builder.setSourceType(TxnSourceTypePB.forNumber(txnCoordinator.sourceType.value()));
+        builder.setId(txnCoordinator.id);
         builder.setIp(txnCoordinator.ip);
+        builder.setStartTime(txnCoordinator.startTime);
         return builder.build();
     }
 
     public static TxnCoordinator txnCoordinatorFromPb(TxnCoordinatorPB 
txnCoordinatorPB) {
-        TxnCoordinator txnCoordinator = new TxnCoordinator();
-        txnCoordinator.sourceType = 
TxnSourceType.valueOf(txnCoordinatorPB.getSourceType().getNumber());
-        txnCoordinator.ip = txnCoordinatorPB.getIp();
-        return txnCoordinator;
+        return new 
TxnCoordinator(TxnSourceType.valueOf(txnCoordinatorPB.getSourceType().getNumber()),
+                txnCoordinatorPB.getId(), txnCoordinatorPB.getIp(), 
txnCoordinatorPB.getStartTime());
     }
 
     public static TransactionState transactionStateFromPb(TxnInfoPB txnInfo) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 0cc18e7c73d..5767e303f35 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
 import org.apache.doris.httpv2.entity.RestBaseResult;
 import org.apache.doris.httpv2.exception.UnauthorizedException;
@@ -353,6 +354,11 @@ public class LoadAction extends RestBaseController {
 
     private TNetworkAddress selectRedirectBackend(HttpServletRequest request, 
boolean groupCommit)
             throws LoadException {
+        long debugBackendId = 
DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId",
 -1L);
+        if (debugBackendId != -1L) {
+            Backend backend = 
Env.getCurrentSystemInfo().getBackend(debugBackendId);
+            return new TNetworkAddress(backend.getHost(), 
backend.getHttpPort());
+        }
         if (Config.isCloudMode()) {
             String cloudClusterName = getCloudClusterName(request);
             if (Strings.isNullOrEmpty(cloudClusterName)) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
index c766f107aa1..f3915bc6f4c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
@@ -51,6 +51,7 @@ import org.apache.doris.planner.ListPartitionPrunerV2;
 import org.apache.doris.planner.PartitionPruner;
 import org.apache.doris.planner.RangePartitionPrunerV2;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTaskExecutor;
@@ -284,8 +285,9 @@ public class DeleteJob extends 
AbstractTxnStateChangeCallback implements DeleteJ
     public long beginTxn() throws Exception {
         long txnId = 
Env.getCurrentGlobalTransactionMgr().beginTransaction(deleteInfo.getDbId(),
                 Lists.newArrayList(deleteInfo.getTableId()), label, null,
-                new TransactionState.TxnCoordinator(
-                        TransactionState.TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
+                        FrontendOptions.getLocalHostAddress(),
+                        ExecuteEnv.getInstance().getStartupTime()),
                 TransactionState.LoadJobSourceType.FRONTEND, id, 
Config.stream_load_default_timeout_second);
         this.transactionId = txnId;
         
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 2ffe85bb36a..fbe7db45bbf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -50,6 +50,7 @@ import org.apache.doris.load.FailMsg;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TUniqueId;
@@ -123,7 +124,9 @@ public class BrokerLoadJob extends BulkLoadJob {
             QuotaExceedException, MetaNotFoundException {
         transactionId = Env.getCurrentGlobalTransactionMgr()
                 .beginTransaction(dbId, 
Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
-                        new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                        new TxnCoordinator(TxnSourceType.FE, 0,
+                                FrontendOptions.getLocalHostAddress(),
+                                ExecuteEnv.getInstance().getStartupTime()),
                         TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id,
                         getTimeout());
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 61687e1c4b4..1d1dc426fda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -62,6 +62,7 @@ import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.EtlStatus;
 import org.apache.doris.load.FailMsg;
 import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.sparkdpp.DppResult;
 import org.apache.doris.sparkdpp.EtlJobConfig;
@@ -199,7 +200,9 @@ public class SparkLoadJob extends BulkLoadJob {
             QuotaExceedException, MetaNotFoundException {
         transactionId = Env.getCurrentGlobalTransactionMgr()
                 .beginTransaction(dbId, 
Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
-                        new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                        new TxnCoordinator(TxnSourceType.FE, 0,
+                                FrontendOptions.getLocalHostAddress(),
+                                ExecuteEnv.getInstance().getStartupTime()),
                         LoadJobSourceType.FRONTEND, id, getTimeout());
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index cdee942f408..d101d98cf85 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.QuotaExceedException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.thrift.TRoutineLoadTask;
 import org.apache.doris.transaction.BeginTransactionException;
@@ -199,7 +200,9 @@ public abstract class RoutineLoadTaskInfo {
         try {
             txnId = 
Env.getCurrentGlobalTransactionMgr().beginTransaction(routineLoadJob.getDbId(),
                     Lists.newArrayList(routineLoadJob.getTableId()), 
DebugUtil.printId(id), null,
-                    new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                    new TxnCoordinator(TxnSourceType.FE, 0,
+                            FrontendOptions.getLocalHostAddress(),
+                            ExecuteEnv.getInstance().getStartupTime()),
                     TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, 
routineLoadJob.getId(),
                     timeoutMs / 1000);
         } catch (DuplicatedRequestException e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
index 3126984d33a..2aea7bad800 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
@@ -32,6 +32,7 @@ import org.apache.doris.load.sync.SyncJob;
 import org.apache.doris.load.sync.model.Data;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.qe.InsertStreamTxnExecutor;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.task.SyncTask;
 import org.apache.doris.task.SyncTaskPool;
@@ -132,8 +133,10 @@ public class CanalSyncChannel extends SyncChannel {
                 try {
                     long txnId = 
globalTransactionMgr.beginTransaction(db.getId(),
                             Lists.newArrayList(tbl.getId()), label,
-                        new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE,
-                            FrontendOptions.getLocalHostAddress()), 
sourceType, timeoutSecond);
+                            new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
+                                    FrontendOptions.getLocalHostAddress(),
+                                    ExecuteEnv.getInstance().getStartupTime()),
+                            sourceType, timeoutSecond);
                     String token = 
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
                     request = new TStreamLoadPutRequest()
                         
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
index 32121b9833b..c452a242dcc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
@@ -58,6 +58,7 @@ import org.apache.doris.qe.InsertStreamTxnExecutor;
 import org.apache.doris.qe.MasterTxnExecutor;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
@@ -193,9 +194,10 @@ public class InsertUtils {
             String token = 
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
             if (Config.isCloudMode() || Env.getCurrentEnv().isMaster()) {
                 txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
-                        txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
-                        label, new TransactionState.TxnCoordinator(
-                                TransactionState.TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                        txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), 
label,
+                        new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
+                                FrontendOptions.getLocalHostAddress(),
+                                ExecuteEnv.getInstance().getStartupTime()),
                         sourceType, timeoutSecond);
             } else {
                 MasterTxnExecutor masterTxnExecutor = new 
MasterTxnExecutor(ctx);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 2ed8bae8c3e..b4f5503ed44 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -45,6 +45,7 @@ import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TOlapTableLocationParam;
@@ -96,7 +97,9 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
         try {
             this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
                     database.getId(), ImmutableList.of(table.getId()), 
labelName,
-                    new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                    new TxnCoordinator(TxnSourceType.FE, 0,
+                            FrontendOptions.getLocalHostAddress(),
+                            ExecuteEnv.getInstance().getStartupTime()),
                     LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout());
         } catch (Exception e) {
             throw new AnalysisException("begin transaction failed. " + 
e.getMessage(), e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 6fb5a062c07..b51148194eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -189,6 +189,7 @@ import org.apache.doris.rewrite.ExprRewriter;
 import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.statistics.ResultRow;
 import org.apache.doris.statistics.util.InternalQueryBuffer;
@@ -2199,9 +2200,10 @@ public class StmtExecutor {
         String label = txnEntry.getLabel();
         if (Env.getCurrentEnv().isMaster()) {
             long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
-                    txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
-                    label, new TransactionState.TxnCoordinator(
-                            TransactionState.TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                    txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), 
label,
+                    new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
+                            FrontendOptions.getLocalHostAddress(),
+                            ExecuteEnv.getInstance().getStartupTime()),
                     sourceType, timeoutSecond);
             txnConf.setTxnId(txnId);
             String token = 
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index e05b126ca3b..76a191dfbbb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1217,7 +1217,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl, 
TableType.OLAP);
         // begin
         long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : 
Config.stream_load_default_timeout_second;
-        TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, 
clientIp);
+        Backend backend = 
Env.getCurrentSystemInfo().getBackend(request.getBackendId());
+        long startTime = backend != null ? backend.getLastStartTime() : 0;
+        TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, 
request.getBackendId(), clientIp, startTime);
         if (request.isSetToken()) {
             txnCoord.isFromInternal = true;
         }
@@ -1325,10 +1327,12 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         // step 5: get timeout
         long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : 
Config.stream_load_default_timeout_second;
 
+        Backend backend = 
Env.getCurrentSystemInfo().getBackend(request.getBackendId());
+        long startTime = backend != null ? backend.getLastStartTime() : 0;
+        TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, 
request.getBackendId(), clientIp, startTime);
         // step 6: begin transaction
         long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
-                db.getId(), tableIdList, request.getLabel(), 
request.getRequestId(),
-                new TxnCoordinator(TxnSourceType.BE, clientIp),
+                db.getId(), tableIdList, request.getLabel(), 
request.getRequestId(), txnCoord,
                 TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, 
timeoutSecond);
 
         // step 7: return result
@@ -2105,6 +2109,25 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             
httpStreamParams.getParams().setLoadStreamPerNode(loadStreamPerNode);
             
httpStreamParams.getParams().setTotalLoadStreams(loadStreamPerNode);
             httpStreamParams.getParams().setNumLocalSink(1);
+
+            TransactionState txnState = 
Env.getCurrentGlobalTransactionMgr().getTransactionState(
+                    httpStreamParams.getDb().getId(), 
httpStreamParams.getTxnId());
+            if (txnState == null) {
+                LOG.warn("Not found http stream related txn, txn id = {}", 
httpStreamParams.getTxnId());
+            } else {
+                TxnCoordinator txnCoord = txnState.getCoordinator();
+                Backend backend = 
Env.getCurrentSystemInfo().getBackend(request.getBackendId());
+                if (backend != null) {
+                    // only modify txnCoord in memory, not write editlog yet.
+                    txnCoord.sourceType = TxnSourceType.BE;
+                    txnCoord.id = backend.getId();
+                    txnCoord.ip = backend.getHost();
+                    txnCoord.startTime = backend.getLastStartTime();
+                    LOG.info("Change http stream related txn {} to coordinator 
{}",
+                            httpStreamParams.getTxnId(), txnCoord);
+                }
+            }
+
             result.setPipelineParams(httpStreamParams.getParams());
             
result.getPipelineParams().setDbName(httpStreamParams.getDb().getFullName());
             
result.getPipelineParams().setTableName(httpStreamParams.getTable().getName());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 8bf20844ea1..5f49e88ba6a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -170,14 +170,21 @@ public class HeartbeatMgr extends MasterDaemon {
                 BackendHbResponse hbResponse = (BackendHbResponse) response;
                 Backend be = nodeMgr.getBackend(hbResponse.getBeId());
                 if (be != null) {
+                    long oldStartTime = be.getLastStartTime();
                     boolean isChanged = be.handleHbResponse(hbResponse, 
isReplay);
-                    if (hbResponse.getStatus() != HbStatus.OK) {
+                    if (hbResponse.getStatus() == HbStatus.OK) {
+                        long newStartTime = be.getLastStartTime();
+                        if (!isReplay && oldStartTime != newStartTime) {
+                            
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart(
+                                    be.getId(), be.getHost(), newStartTime);
+                        }
+                    } else {
                         // invalid all connections cached in ClientPool
                         ClientPool.backendPool.clearPool(new 
TNetworkAddress(be.getHost(), be.getBePort()));
                         if (!isReplay && System.currentTimeMillis() - 
be.getLastUpdateMs()
                                 >= 
Config.abort_txn_after_lost_heartbeat_time_second * 1000L) {
-                            Env.getCurrentGlobalTransactionMgr()
-                                    
.abortTxnWhenCoordinateBeDown(be.getHost(), 100);
+                            
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeDown(
+                                    be.getId(), be.getHost(), 100);
                         }
                     }
                     return isChanged;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index a5c23a63a7a..3d4bddbc660 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -2036,13 +2036,16 @@ public class DatabaseTransactionMgr {
         return null;
     }
 
-    public List<Pair<Long, Long>> getTransactionIdByCoordinateBe(String 
coordinateHost, int limit) {
+    public List<Pair<Long, Long>> getPrepareTransactionIdByCoordinateBe(long 
coordinateBeId,
+            String coordinateHost, int limit) {
         ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>();
         readLock();
         try {
             idToRunningTransactionState.values().stream()
                     .filter(t -> (t.getCoordinator().sourceType == 
TransactionState.TxnSourceType.BE
-                            && t.getCoordinator().ip.equals(coordinateHost)))
+                            && t.getTransactionStatus() == 
TransactionStatus.PREPARE
+                            && t.getCoordinator().ip.equals(coordinateHost)
+                            && (t.getCoordinator().id == 0 || 
t.getCoordinator().id == coordinateBeId)))
                     .limit(limit)
                     .forEach(t -> txnInfos.add(Pair.of(t.getDbId(), 
t.getTransactionId())));
         } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index ae1a467205f..719999b8094 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -566,20 +566,35 @@ public class GlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         dbTransactionMgr.updateDatabaseUsedQuotaData(usedQuotaDataBytes);
     }
 
+    @Override
+    public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String 
coordinateHost, long beStartTime) {
+        List<Pair<Long, Long>> transactionIdByCoordinateBe
+                = getPrepareTransactionIdByCoordinateBe(coordinateBeId, 
coordinateHost, Integer.MAX_VALUE);
+        for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
+            try {
+                DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(txnInfo.first);
+                TransactionState transactionState = 
dbTransactionMgr.getTransactionState(txnInfo.second);
+                long coordStartTime = 
transactionState.getCoordinator().startTime;
+                if (coordStartTime > 0 && coordStartTime < beStartTime) {
+                    dbTransactionMgr.abortTransaction(txnInfo.second, 
"coordinate BE restart", null);
+                }
+            } catch (UserException e) {
+                LOG.warn("Abort txn on coordinate BE {} failed, msg={}", 
coordinateHost, e.getMessage());
+            }
+        }
+    }
+
     /**
      * If a Coordinate BE is down when running txn, the txn will remain in FE 
until killed by timeout
      * So when FE identify the Coordinate BE is down, FE should cancel it 
initiative
      */
     @Override
-    public void abortTxnWhenCoordinateBeDown(String coordinateHost, int limit) 
{
-        List<Pair<Long, Long>> transactionIdByCoordinateBe = 
getTransactionIdByCoordinateBe(coordinateHost, limit);
+    public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String 
coordinateHost, int limit) {
+        List<Pair<Long, Long>> transactionIdByCoordinateBe
+                = getPrepareTransactionIdByCoordinateBe(coordinateBeId, 
coordinateHost, limit);
         for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
             try {
                 DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(txnInfo.first);
-                TransactionState transactionState = 
dbTransactionMgr.getTransactionState(txnInfo.second);
-                if (transactionState.getTransactionStatus() == 
TransactionStatus.PRECOMMITTED) {
-                    continue;
-                }
                 dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate 
BE is down", null);
             } catch (UserException e) {
                 LOG.warn("Abort txn on coordinate BE {} failed, msg={}", 
coordinateHost, e.getMessage());
@@ -763,11 +778,12 @@ public class GlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
     }
 
-    @Deprecated
-    protected List<Pair<Long, Long>> getTransactionIdByCoordinateBe(String 
coordinateHost, int limit) {
+    protected List<Pair<Long, Long>> 
getPrepareTransactionIdByCoordinateBe(long coordinateBeId,
+            String coordinateHost, int limit) {
         ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>();
         for (DatabaseTransactionMgr databaseTransactionMgr : 
dbIdToDatabaseTransactionMgrs.values()) {
-            
txnInfos.addAll(databaseTransactionMgr.getTransactionIdByCoordinateBe(coordinateHost,
 limit));
+            
txnInfos.addAll(databaseTransactionMgr.getPrepareTransactionIdByCoordinateBe(
+                        coordinateBeId, coordinateHost, limit));
             if (txnInfos.size() > limit) {
                 break;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
index c8c9ff38037..6fd32c8ee7b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
@@ -134,7 +134,9 @@ public interface GlobalTransactionMgrIface extends Writable 
{
 
     public void updateDatabaseUsedQuotaData(long dbId, long 
usedQuotaDataBytes) throws AnalysisException;
 
-    public void abortTxnWhenCoordinateBeDown(String coordinateHost, int limit);
+    public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String 
coordinateHost, long beStartTime);
+
+    public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String 
coordinateHost, int limit);
 
     public TransactionStatus getLabelState(long dbId, String label) throws 
AnalysisException;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index da28dd8250f..85b7ace42ee 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -38,6 +38,7 @@ import org.apache.doris.qe.InsertStreamTxnExecutor;
 import org.apache.doris.qe.MasterOpExecutor;
 import org.apache.doris.qe.MasterTxnExecutor;
 import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TLoadTxnBeginRequest;
@@ -203,7 +204,9 @@ public class TransactionEntry {
             if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) {
                 this.transactionId = 
Env.getCurrentGlobalTransactionMgr().beginTransaction(
                         database.getId(), Lists.newArrayList(table.getId()), 
label,
-                        new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                        new TxnCoordinator(TxnSourceType.FE, 0,
+                                FrontendOptions.getLocalHostAddress(),
+                                ExecuteEnv.getInstance().getStartupTime()),
                         LoadJobSourceType.INSERT_STREAMING, timeoutSecond);
             } else {
                 String token = 
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index f628c945b6c..f03457b0f09 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -166,8 +166,14 @@ public class TransactionState implements Writable {
     public static class TxnCoordinator {
         @SerializedName(value = "sourceType")
         public TxnSourceType sourceType;
+        // backendId for backend, 0 for frontend
+        @SerializedName(value = "id")
+        public long id = 0;
         @SerializedName(value = "ip")
         public String ip;
+        // frontend/backend start time
+        @SerializedName(value = "st")
+        public long startTime = 0;
         // True if this txn if created by system(such as writing data to audit 
table)
         @SerializedName(value = "ii")
         public boolean isFromInternal = false;
@@ -175,9 +181,11 @@ public class TransactionState implements Writable {
         public TxnCoordinator() {
         }
 
-        public TxnCoordinator(TxnSourceType sourceType, String ip) {
+        public TxnCoordinator(TxnSourceType sourceType, long id, String ip, 
long startTime) {
             this.sourceType = sourceType;
+            this.id = id;
             this.ip = ip;
+            this.startTime = startTime;
         }
 
         @Override
@@ -319,7 +327,8 @@ public class TransactionState implements Writable {
         this.transactionId = -1;
         this.label = "";
         this.idToTableCommitInfos = Maps.newHashMap();
-        this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, 
"127.0.0.1"); // mocked, to avoid NPE
+        // mocked, to avoid NPE
+        this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, 0, 
"127.0.0.1", System.currentTimeMillis());
         this.transactionStatus = TransactionStatus.PREPARE;
         this.sourceType = LoadJobSourceType.FRONTEND;
         this.prepareTime = -1;
@@ -758,7 +767,7 @@ public class TransactionState implements Writable {
             TableCommitInfo info = TableCommitInfo.read(in);
             idToTableCommitInfos.put(info.getTableId(), info);
         }
-        txnCoordinator = new 
TxnCoordinator(TxnSourceType.valueOf(in.readInt()), Text.readString(in));
+        txnCoordinator = new 
TxnCoordinator(TxnSourceType.valueOf(in.readInt()), 0, Text.readString(in), 0);
         transactionStatus = TransactionStatus.valueOf(in.readInt());
         sourceType = LoadJobSourceType.valueOf(in.readInt());
         prepareTime = in.readLong();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
index a1aa78d2595..f62078c3050 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
@@ -61,7 +61,8 @@ public class CloudGlobalTransactionMgrTest {
     private static GlobalTransactionMgrIface masterTransMgr;
     private static Env masterEnv;
 
-    private TransactionState.TxnCoordinator transactionSource = new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe");
+    private TransactionState.TxnCoordinator transactionSource = new 
TransactionState.TxnCoordinator(
+            TransactionState.TxnSourceType.FE, 0, "localfe", 
System.currentTimeMillis());
 
     @Before
     public void setUp() throws InstantiationException, IllegalAccessException, 
IllegalArgumentException,
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index c20464f089d..74121e2d51b 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -67,8 +67,8 @@ public class DatabaseTransactionMgrTest {
     private static Env slaveEnv;
     private static Map<String, Long> LabelToTxnId;
 
-    private TransactionState.TxnCoordinator transactionSource =
-            new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe");
+    private TransactionState.TxnCoordinator transactionSource = new 
TransactionState.TxnCoordinator(
+            TransactionState.TxnSourceType.FE, 0, "localfe", 
System.currentTimeMillis());
 
     public static void setTransactionFinishPublish(TransactionState 
transactionState, List<Long> backendIds) {
         if (transactionState.getSubTransactionStates() != null) {
@@ -141,7 +141,8 @@ public class DatabaseTransactionMgrTest {
         labelToTxnId.put(CatalogTestUtil.testTxnLabel1, transactionId1);
 
         // txn 2, 3, 4
-        TransactionState.TxnCoordinator beTransactionSource = new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.BE, "be1");
+        TransactionState.TxnCoordinator beTransactionSource = new 
TransactionState.TxnCoordinator(
+                TransactionState.TxnSourceType.BE, 0, "be1", 
System.currentTimeMillis());
         long transactionId2 = 
masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(CatalogTestUtil.testTableId1),
                 CatalogTestUtil.testTxnLabel2,
                 beTransactionSource,
@@ -221,7 +222,7 @@ public class DatabaseTransactionMgrTest {
     @Test
     public void testGetTransactionIdByCoordinateBe() throws UserException {
         DatabaseTransactionMgr masterDbTransMgr = 
masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1);
-        List<Pair<Long, Long>> transactionInfoList = 
masterDbTransMgr.getTransactionIdByCoordinateBe("be1", 10);
+        List<Pair<Long, Long>> transactionInfoList = 
masterDbTransMgr.getPrepareTransactionIdByCoordinateBe(0, "be1", 10);
         Assert.assertEquals(3, transactionInfoList.size());
         Assert.assertEquals(CatalogTestUtil.testDbId1, 
transactionInfoList.get(0).first.longValue());
         Assert.assertEquals(TransactionStatus.PREPARE,
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index a2826745021..a5b2b2115eb 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -88,7 +88,8 @@ public class GlobalTransactionMgrTest {
     private static Env masterEnv;
     private static Env slaveEnv;
 
-    private TransactionState.TxnCoordinator transactionSource = new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe");
+    private TransactionState.TxnCoordinator transactionSource = new 
TransactionState.TxnCoordinator(
+            TransactionState.TxnSourceType.FE, 0, "localfe", 
System.currentTimeMillis());
     protected static List<Long> allBackends = 
Lists.newArrayList(CatalogTestUtil.testBackendId1,
             CatalogTestUtil.testBackendId2, CatalogTestUtil.testBackendId3);
 
@@ -302,7 +303,9 @@ public class GlobalTransactionMgrTest {
         Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
         routineLoadTaskInfoList.add(routineLoadTaskInfo);
         TransactionState transactionState = new TransactionState(1L, 
Lists.newArrayList(1L), 1L, "label", null,
-                LoadJobSourceType.ROUTINE_LOAD_TASK, new 
TxnCoordinator(TxnSourceType.BE, "be1"), routineLoadJob.getId(),
+                LoadJobSourceType.ROUTINE_LOAD_TASK,
+                new TxnCoordinator(TxnSourceType.BE, 0, "be1", 
System.currentTimeMillis()),
+                routineLoadJob.getId(),
                 Config.stream_load_default_timeout_second);
         transactionState.setTransactionStatus(TransactionStatus.PREPARE);
         masterTransMgr.getCallbackFactory().addCallback(routineLoadJob);
@@ -366,7 +369,9 @@ public class GlobalTransactionMgrTest {
         Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
         routineLoadTaskInfoList.add(routineLoadTaskInfo);
         TransactionState transactionState = new TransactionState(1L, 
Lists.newArrayList(1L), 1L, "label", null,
-                LoadJobSourceType.ROUTINE_LOAD_TASK, new 
TxnCoordinator(TxnSourceType.BE, "be1"), routineLoadJob.getId(),
+                LoadJobSourceType.ROUTINE_LOAD_TASK,
+                new TxnCoordinator(TxnSourceType.BE, 0, "be1", 
System.currentTimeMillis()),
+                routineLoadJob.getId(),
                 Config.stream_load_default_timeout_second);
         transactionState.setTransactionStatus(TransactionStatus.PREPARE);
         masterTransMgr.getCallbackFactory().addCallback(routineLoadJob);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
index 2c038aaff37..55faca2a789 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
@@ -88,8 +88,9 @@ public class TransactionStateTest {
         UUID uuid = UUID.randomUUID();
         TransactionState transactionState = new TransactionState(1000L, 
Lists.newArrayList(20000L, 20001L),
                 3000, "label123", new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits()),
-                LoadJobSourceType.BACKEND_STREAMING, new 
TxnCoordinator(TxnSourceType.BE, "127.0.0.1"), 50000L,
-                60 * 1000L);
+                LoadJobSourceType.BACKEND_STREAMING,
+                new TxnCoordinator(TxnSourceType.BE, 0, "127.0.0.1", 
System.currentTimeMillis()),
+                50000L, 60 * 1000L);
         testSerDe(fileName, transactionState, readTransactionState -> {
             Assert.assertEquals(transactionState.getCoordinator().ip, 
readTransactionState.getCoordinator().ip);
         });
@@ -112,7 +113,8 @@ public class TransactionStateTest {
         // TransactionState
         TransactionState transactionState = new TransactionState(1000L, 
Lists.newArrayList(20000L, 20001L), 3000,
                 "label123", new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits()),
-                LoadJobSourceType.BACKEND_STREAMING, new 
TxnCoordinator(TxnSourceType.BE, "127.0.0.1"),
+                LoadJobSourceType.BACKEND_STREAMING,
+                new TxnCoordinator(TxnSourceType.BE, 0, "127.0.0.1", 
System.currentTimeMillis()),
                 TransactionStatus.COMMITTED, "", 100, 50000L, 
loadJobFinalOperation, 100, 200, 300, 400);
         // check
         testSerDe(fileName2, transactionState, readTransactionState -> {
@@ -144,7 +146,8 @@ public class TransactionStateTest {
         // TransactionState
         TransactionState transactionState = new TransactionState(1000L, 
Lists.newArrayList(20000L, 20001L),
                 3000, "label123", new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits()),
-                LoadJobSourceType.BACKEND_STREAMING, new 
TxnCoordinator(TxnSourceType.BE, "127.0.0.1"),
+                LoadJobSourceType.BACKEND_STREAMING,
+                new TxnCoordinator(TxnSourceType.BE, 0, "127.0.0.1", 
System.currentTimeMillis()),
                 TransactionStatus.COMMITTED, "", 100, 50000L,
                 attachment, 100, 200, 300, 400);
         // check
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index e1c3c9be5ab..f285ad3f260 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -270,6 +270,8 @@ enum TxnStatusPB {
 message TxnCoordinatorPB {
     optional TxnSourceTypePB sourceType = 1;
     optional string ip = 2;
+    optional int64 id = 3;
+    optional int64 start_time = 4;
 }
 
 message RoutineLoadProgressPB {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 556443a3d09..7c563a97272 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -614,6 +614,7 @@ struct TLoadTxnBeginRequest {
     12: optional string token
     13: optional string auth_code_uuid
     14: optional i64 table_id
+    15: optional i64 backend_id
 }
 
 struct TLoadTxnBeginResult {
@@ -636,6 +637,7 @@ struct TBeginTxnRequest {
     9: optional i64 timeout
     10: optional Types.TUniqueId request_id
     11: optional string token
+    12: optional i64 backend_id
 }
 
 struct TBeginTxnResult {
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 8b408bb7f2f..9b5090bec92 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -902,6 +902,33 @@ class Suite implements GroovyInterceptable {
         return hdfs.downLoad(label)
     }
 
+    void runStreamLoadExample(String tableName, String coordidateBeHostPort = 
"") {
+        def backends = sql_return_maparray "show backends"
+        sql """
+                CREATE TABLE IF NOT EXISTS ${tableName} (
+                    id int,
+                    name varchar(255)
+                )
+                DISTRIBUTED BY HASH(id) BUCKETS 1
+                PROPERTIES (
+                  "replication_num" = "${backends.size()}"
+                )
+            """
+
+        streamLoad {
+            table tableName
+            set 'column_separator', ','
+            file context.config.dataPath + "/demo_p0/streamload_input.csv"
+            time 10000
+            if (!coordidateBeHostPort.equals("")) {
+                def pos = coordidateBeHostPort.indexOf(':')
+                def host = coordidateBeHostPort.substring(0, pos)
+                def httpPort = coordidateBeHostPort.substring(pos + 
1).toInteger()
+                directToBe host, httpPort
+            }
+        }
+    }
+
     void streamLoad(Closure actionSupplier) {
         runAction(new StreamLoadAction(context), actionSupplier)
     }
diff --git a/regression-test/suites/demo_p0/streamLoad_action.groovy 
b/regression-test/suites/demo_p0/streamLoad_action.groovy
index 733483517d0..59d12c965a2 100644
--- a/regression-test/suites/demo_p0/streamLoad_action.groovy
+++ b/regression-test/suites/demo_p0/streamLoad_action.groovy
@@ -126,6 +126,11 @@ suite("streamLoad_action") {
         LIMIT 5;
     """
 
+    def tableName2 = "test_streamload_action2"
+    runStreamLoadExample(tableName2)
+
     sql """ DROP TABLE ${tableName} """
+    sql """ DROP TABLE ${tableName2}"""
+
     sql """ DROP TABLE B """
 }
diff --git 
a/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy 
b/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy
new file mode 100644
index 00000000000..bb6b0c18a0d
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.http.NoHttpResponseException
+
+suite('test_coordidator_be_restart') {
+    def options = new ClusterOptions()
+    options.cloudMode = false
+    options.enableDebugPoints()
+
+    docker(options) {
+        def db = context.config.getDbNameByFile(context.file)
+        def tableName1 = 'tbl_test_coordidator_be_restart_t1'
+        setFeConfig('abort_txn_after_lost_heartbeat_time_second', 3600)
+
+        def dbId = getDbId()
+
+        def tableName2 = 'tbl_test_coordidator_be_restart_t2'
+
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName2} (
+                id int,
+                name CHAR(10),
+                dt_1 DATETIME DEFAULT CURRENT_TIMESTAMP,
+                dt_2 DATETIMEV2 DEFAULT CURRENT_TIMESTAMP,
+                dt_3 DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP,
+                dt_4 DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP
+            )
+            DISTRIBUTED BY HASH(id) BUCKETS 1
+        """
+
+        def txns = sql_return_maparray "show proc 
'/transactions/${dbId}/running'"
+        assertEquals(0, txns.size())
+        txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'"
+        assertEquals(0, txns.size())
+
+        def coordinatorBe = cluster.getAllBackends().get(0)
+        def coordinatorBeHost = coordinatorBe.host
+
+        
GetDebugPoint().enableDebugPointForAllFEs('LoadAction.selectRedirectBackend.backendId',
 [value: coordinatorBe.backendId])
+        
GetDebugPoint().enableDebugPointForAllBEs('StreamLoadExecutor.commit_txn.block')
+
+        thread {
+            try {
+                runStreamLoadExample(tableName1, coordinatorBe.host + ':' + 
coordinatorBe.httpPort)
+            } catch (NoHttpResponseException t) {
+            // be down  will raise NoHttpResponseException
+            }
+        }
+
+        thread {
+            try {
+                streamLoad {
+                    set 'version', '1'
+                    set 'sql', """
+                            insert into ${db}.${tableName2} (id, name) select 
c1, c2 from http_stream("format"="csv")
+                            """
+                    time 120 * 1000
+                    file context.config.dataPath + 
'/load_p0/http_stream/test_http_stream.csv'
+                }
+            } catch (Exception e) {
+                logger.info('http stream: ' + e)
+            }
+        }
+
+        sleep(5000)
+        txns = sql_return_maparray "show proc '/transactions/${dbId}/running'"
+        logger.info('running txns: ' + txns)
+        assertEquals(2, txns.size())
+        for (def txn : txns) {
+            assertEquals('PREPARE', txn.TransactionStatus)
+        }
+
+        txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'"
+        assertEquals(0, txns.size())
+
+        // coordinatorBe shutdown not abort txn because 
abort_txn_after_lost_heartbeat_time_second = 3600
+        cluster.stopBackends(coordinatorBe.index)
+        def isDead = false
+        for (def i = 0; i < 10; i++) {
+            def be = sql_return_maparray('show backends').find { it.Host == 
coordinatorBeHost }
+            if (!be.Alive.toBoolean()) {
+                isDead = true
+                break
+            }
+            sleep 1000
+        }
+        assertTrue(isDead)
+        sleep 5000
+        txns = sql_return_maparray "show proc '/transactions/${dbId}/running'"
+        logger.info('running txns: ' + txns)
+        assertEquals(2, txns.size())
+        for (def txn : txns) {
+            assertEquals('PREPARE', txn.TransactionStatus)
+        }
+
+        // coordinatorBe restart, abort txn on it
+        cluster.startBackends(coordinatorBe.index)
+        def isAlive = false
+        for (def i = 0; i < 20; i++) {
+            def be = sql_return_maparray('show backends').find { it.Host == 
coordinatorBeHost }
+            if (be.Alive.toBoolean()) {
+                isAlive = true
+                break
+            }
+            sleep 1000
+        }
+        assertTrue(isAlive)
+        sleep 5000
+        txns = sql_return_maparray "show proc '/transactions/${dbId}/running'"
+        logger.info('running txns: ' + txns)
+        assertEquals(0, txns.size())
+        txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'"
+        logger.info('finished txns: ' + txns)
+        assertEquals(2, txns.size())
+        for (def txn : txns) {
+            assertEquals('ABORTED', txn.TransactionStatus)
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to