This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 815c7e6a58a [fix](txn) Fix coordidator be restart not abort txn
(#35342)
815c7e6a58a is described below
commit 815c7e6a58ad6dcff00bebd233a10fda6ad45ede
Author: yujun <[email protected]>
AuthorDate: Fri Jun 14 19:41:51 2024 +0800
[fix](txn) Fix coordidator be restart not abort txn (#35342)
BUG: fe will abort coordidator BE's txn when be shutdown exceeds 5min.
But if BE restart within 5min, then this BE's txns will not abort until
timeout.
FIX: every txn will record BE's id and its start time. When fe found
be's startTime change, it will abort the BE's old txns.
---
.../runtime/stream_load/stream_load_executor.cpp | 4 +
.../apache/doris/analysis/NativeInsertStmt.java | 5 +-
.../transaction/CloudGlobalTransactionMgr.java | 7 +-
.../apache/doris/cloud/transaction/TxnUtil.java | 8 +-
.../org/apache/doris/httpv2/rest/LoadAction.java | 6 +
.../main/java/org/apache/doris/load/DeleteJob.java | 6 +-
.../apache/doris/load/loadv2/BrokerLoadJob.java | 5 +-
.../org/apache/doris/load/loadv2/SparkLoadJob.java | 5 +-
.../load/routineload/RoutineLoadTaskInfo.java | 5 +-
.../doris/load/sync/canal/CanalSyncChannel.java | 7 +-
.../trees/plans/commands/insert/InsertUtils.java | 8 +-
.../plans/commands/insert/OlapInsertExecutor.java | 5 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 8 +-
.../apache/doris/service/FrontendServiceImpl.java | 29 ++++-
.../java/org/apache/doris/system/HeartbeatMgr.java | 13 +-
.../doris/transaction/DatabaseTransactionMgr.java | 7 +-
.../doris/transaction/GlobalTransactionMgr.java | 34 ++++--
.../transaction/GlobalTransactionMgrIface.java | 4 +-
.../apache/doris/transaction/TransactionEntry.java | 5 +-
.../apache/doris/transaction/TransactionState.java | 15 ++-
.../transaction/CloudGlobalTransactionMgrTest.java | 3 +-
.../transaction/DatabaseTransactionMgrTest.java | 9 +-
.../transaction/GlobalTransactionMgrTest.java | 11 +-
.../doris/transaction/TransactionStateTest.java | 11 +-
gensrc/proto/cloud.proto | 2 +
gensrc/thrift/FrontendService.thrift | 2 +
.../org/apache/doris/regression/suite/Suite.groovy | 27 +++++
.../suites/demo_p0/streamLoad_action.groovy | 5 +
.../stream_load/test_coordidator_be_restart.groovy | 135 +++++++++++++++++++++
29 files changed, 337 insertions(+), 54 deletions(-)
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 58621c77a2a..051aca3e130 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -45,6 +45,7 @@
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
#include "thrift/protocol/TDebugProtocol.h"
+#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/thrift_rpc_helper.h"
#include "util/time.h"
@@ -174,6 +175,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext*
ctx) {
request.__set_timeout(ctx->timeout_second);
}
request.__set_request_id(ctx->id.to_thrift());
+ request.__set_backend_id(_exec_env->master_info()->backend_id);
TLoadTxnBeginResult result;
Status status;
@@ -309,6 +311,8 @@ void
StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx,
}
Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
+ DBUG_EXECUTE_IF("StreamLoadExecutor.commit_txn.block", DBUG_BLOCK);
+
DorisMetrics::instance()->stream_load_txn_commit_request_total->increment(1);
TLoadTxnCommitRequest request;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index c4ef8f6597f..8a3a0573cc7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -59,6 +59,7 @@ import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.rewrite.ExprRewriter;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TUniqueId;
@@ -406,7 +407,9 @@ public class NativeInsertStmt extends InsertStmt {
LoadJobSourceType sourceType =
LoadJobSourceType.INSERT_STREAMING;
transactionId =
Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
Lists.newArrayList(targetTable.getId()),
label.getLabelName(),
- new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
+ new TxnCoordinator(TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(),
+ ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
}
isTransactionBegin = true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 0718bda3433..297ac9f1746 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -1160,7 +1160,12 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
@Override
- public void abortTxnWhenCoordinateBeDown(String coordinateHost, int limit)
{
+ public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String
coordinateHost, long beStartTime) {
+ // do nothing in cloud mode
+ }
+
+ @Override
+ public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String
coordinateHost, int limit) {
// do nothing in cloud mode
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
index 5db50a7da89..7e913bd9a93 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
@@ -286,15 +286,15 @@ public class TxnUtil {
public static TxnCoordinatorPB txnCoordinatorToPb(TxnCoordinator
txnCoordinator) {
TxnCoordinatorPB.Builder builder = TxnCoordinatorPB.newBuilder();
builder.setSourceType(TxnSourceTypePB.forNumber(txnCoordinator.sourceType.value()));
+ builder.setId(txnCoordinator.id);
builder.setIp(txnCoordinator.ip);
+ builder.setStartTime(txnCoordinator.startTime);
return builder.build();
}
public static TxnCoordinator txnCoordinatorFromPb(TxnCoordinatorPB
txnCoordinatorPB) {
- TxnCoordinator txnCoordinator = new TxnCoordinator();
- txnCoordinator.sourceType =
TxnSourceType.valueOf(txnCoordinatorPB.getSourceType().getNumber());
- txnCoordinator.ip = txnCoordinatorPB.getIp();
- return txnCoordinator;
+ return new
TxnCoordinator(TxnSourceType.valueOf(txnCoordinatorPB.getSourceType().getNumber()),
+ txnCoordinatorPB.getId(), txnCoordinatorPB.getIp(),
txnCoordinatorPB.getStartTime());
}
public static TransactionState transactionStateFromPb(TxnInfoPB txnInfo) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 0cc18e7c73d..5767e303f35 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.httpv2.exception.UnauthorizedException;
@@ -353,6 +354,11 @@ public class LoadAction extends RestBaseController {
private TNetworkAddress selectRedirectBackend(HttpServletRequest request,
boolean groupCommit)
throws LoadException {
+ long debugBackendId =
DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId",
-1L);
+ if (debugBackendId != -1L) {
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(debugBackendId);
+ return new TNetworkAddress(backend.getHost(),
backend.getHttpPort());
+ }
if (Config.isCloudMode()) {
String cloudClusterName = getCloudClusterName(request);
if (Strings.isNullOrEmpty(cloudClusterName)) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
index c766f107aa1..f3915bc6f4c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
@@ -51,6 +51,7 @@ import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PartitionPruner;
import org.apache.doris.planner.RangePartitionPrunerV2;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
@@ -284,8 +285,9 @@ public class DeleteJob extends
AbstractTxnStateChangeCallback implements DeleteJ
public long beginTxn() throws Exception {
long txnId =
Env.getCurrentGlobalTransactionMgr().beginTransaction(deleteInfo.getDbId(),
Lists.newArrayList(deleteInfo.getTableId()), label, null,
- new TransactionState.TxnCoordinator(
- TransactionState.TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
+ new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(),
+ ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.FRONTEND, id,
Config.stream_load_default_timeout_second);
this.transactionId = txnId;
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 2ffe85bb36a..fbe7db45bbf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -50,6 +50,7 @@ import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
@@ -123,7 +124,9 @@ public class BrokerLoadJob extends BulkLoadJob {
QuotaExceedException, MetaNotFoundException {
transactionId = Env.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId,
Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
- new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
+ new TxnCoordinator(TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(),
+ ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id,
getTimeout());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 61687e1c4b4..1d1dc426fda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -62,6 +62,7 @@ import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.sparkdpp.DppResult;
import org.apache.doris.sparkdpp.EtlJobConfig;
@@ -199,7 +200,9 @@ public class SparkLoadJob extends BulkLoadJob {
QuotaExceedException, MetaNotFoundException {
transactionId = Env.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId,
Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
- new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
+ new TxnCoordinator(TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(),
+ ExecuteEnv.getInstance().getStartupTime()),
LoadJobSourceType.FRONTEND, id, getTimeout());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index cdee942f408..d101d98cf85 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.transaction.BeginTransactionException;
@@ -199,7 +200,9 @@ public abstract class RoutineLoadTaskInfo {
try {
txnId =
Env.getCurrentGlobalTransactionMgr().beginTransaction(routineLoadJob.getDbId(),
Lists.newArrayList(routineLoadJob.getTableId()),
DebugUtil.printId(id), null,
- new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
+ new TxnCoordinator(TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(),
+ ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK,
routineLoadJob.getId(),
timeoutMs / 1000);
} catch (DuplicatedRequestException e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
index 3126984d33a..2aea7bad800 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
@@ -32,6 +32,7 @@ import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.load.sync.model.Data;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.InsertStreamTxnExecutor;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.SyncTask;
import org.apache.doris.task.SyncTaskPool;
@@ -132,8 +133,10 @@ public class CanalSyncChannel extends SyncChannel {
try {
long txnId =
globalTransactionMgr.beginTransaction(db.getId(),
Lists.newArrayList(tbl.getId()), label,
- new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE,
- FrontendOptions.getLocalHostAddress()),
sourceType, timeoutSecond);
+ new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(),
+ ExecuteEnv.getInstance().getStartupTime()),
+ sourceType, timeoutSecond);
String token =
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
request = new TStreamLoadPutRequest()
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
index 32121b9833b..c452a242dcc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
@@ -58,6 +58,7 @@ import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.qe.MasterTxnExecutor;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
@@ -193,9 +194,10 @@ public class InsertUtils {
String token =
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
if (Config.isCloudMode() || Env.getCurrentEnv().isMaster()) {
txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
- txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
- label, new TransactionState.TxnCoordinator(
- TransactionState.TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
+ txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
label,
+ new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(),
+ ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
} else {
MasterTxnExecutor masterTxnExecutor = new
MasterTxnExecutor(ctx);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 2ed8bae8c3e..b4f5503ed44 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -45,6 +45,7 @@ import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TOlapTableLocationParam;
@@ -96,7 +97,9 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
try {
this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), ImmutableList.of(table.getId()),
labelName,
- new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
+ new TxnCoordinator(TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(),
+ ExecuteEnv.getInstance().getStartupTime()),
LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout());
} catch (Exception e) {
throw new AnalysisException("begin transaction failed. " +
e.getMessage(), e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 6fb5a062c07..b51148194eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -189,6 +189,7 @@ import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.util.InternalQueryBuffer;
@@ -2199,9 +2200,10 @@ public class StmtExecutor {
String label = txnEntry.getLabel();
if (Env.getCurrentEnv().isMaster()) {
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
- txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
- label, new TransactionState.TxnCoordinator(
- TransactionState.TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
+ txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
label,
+ new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(),
+ ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
txnConf.setTxnId(txnId);
String token =
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index e05b126ca3b..76a191dfbbb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1217,7 +1217,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl,
TableType.OLAP);
// begin
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() :
Config.stream_load_default_timeout_second;
- TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE,
clientIp);
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(request.getBackendId());
+ long startTime = backend != null ? backend.getLastStartTime() : 0;
+ TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE,
request.getBackendId(), clientIp, startTime);
if (request.isSetToken()) {
txnCoord.isFromInternal = true;
}
@@ -1325,10 +1327,12 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// step 5: get timeout
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() :
Config.stream_load_default_timeout_second;
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(request.getBackendId());
+ long startTime = backend != null ? backend.getLastStartTime() : 0;
+ TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE,
request.getBackendId(), clientIp, startTime);
// step 6: begin transaction
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
- db.getId(), tableIdList, request.getLabel(),
request.getRequestId(),
- new TxnCoordinator(TxnSourceType.BE, clientIp),
+ db.getId(), tableIdList, request.getLabel(),
request.getRequestId(), txnCoord,
TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1,
timeoutSecond);
// step 7: return result
@@ -2105,6 +2109,25 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
httpStreamParams.getParams().setLoadStreamPerNode(loadStreamPerNode);
httpStreamParams.getParams().setTotalLoadStreams(loadStreamPerNode);
httpStreamParams.getParams().setNumLocalSink(1);
+
+ TransactionState txnState =
Env.getCurrentGlobalTransactionMgr().getTransactionState(
+ httpStreamParams.getDb().getId(),
httpStreamParams.getTxnId());
+ if (txnState == null) {
+ LOG.warn("Not found http stream related txn, txn id = {}",
httpStreamParams.getTxnId());
+ } else {
+ TxnCoordinator txnCoord = txnState.getCoordinator();
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(request.getBackendId());
+ if (backend != null) {
+ // only modify txnCoord in memory, not write editlog yet.
+ txnCoord.sourceType = TxnSourceType.BE;
+ txnCoord.id = backend.getId();
+ txnCoord.ip = backend.getHost();
+ txnCoord.startTime = backend.getLastStartTime();
+ LOG.info("Change http stream related txn {} to coordinator
{}",
+ httpStreamParams.getTxnId(), txnCoord);
+ }
+ }
+
result.setPipelineParams(httpStreamParams.getParams());
result.getPipelineParams().setDbName(httpStreamParams.getDb().getFullName());
result.getPipelineParams().setTableName(httpStreamParams.getTable().getName());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 8bf20844ea1..5f49e88ba6a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -170,14 +170,21 @@ public class HeartbeatMgr extends MasterDaemon {
BackendHbResponse hbResponse = (BackendHbResponse) response;
Backend be = nodeMgr.getBackend(hbResponse.getBeId());
if (be != null) {
+ long oldStartTime = be.getLastStartTime();
boolean isChanged = be.handleHbResponse(hbResponse,
isReplay);
- if (hbResponse.getStatus() != HbStatus.OK) {
+ if (hbResponse.getStatus() == HbStatus.OK) {
+ long newStartTime = be.getLastStartTime();
+ if (!isReplay && oldStartTime != newStartTime) {
+
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart(
+ be.getId(), be.getHost(), newStartTime);
+ }
+ } else {
// invalid all connections cached in ClientPool
ClientPool.backendPool.clearPool(new
TNetworkAddress(be.getHost(), be.getBePort()));
if (!isReplay && System.currentTimeMillis() -
be.getLastUpdateMs()
>=
Config.abort_txn_after_lost_heartbeat_time_second * 1000L) {
- Env.getCurrentGlobalTransactionMgr()
-
.abortTxnWhenCoordinateBeDown(be.getHost(), 100);
+
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeDown(
+ be.getId(), be.getHost(), 100);
}
}
return isChanged;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index a5c23a63a7a..3d4bddbc660 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -2036,13 +2036,16 @@ public class DatabaseTransactionMgr {
return null;
}
- public List<Pair<Long, Long>> getTransactionIdByCoordinateBe(String
coordinateHost, int limit) {
+ public List<Pair<Long, Long>> getPrepareTransactionIdByCoordinateBe(long
coordinateBeId,
+ String coordinateHost, int limit) {
ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>();
readLock();
try {
idToRunningTransactionState.values().stream()
.filter(t -> (t.getCoordinator().sourceType ==
TransactionState.TxnSourceType.BE
- && t.getCoordinator().ip.equals(coordinateHost)))
+ && t.getTransactionStatus() ==
TransactionStatus.PREPARE
+ && t.getCoordinator().ip.equals(coordinateHost)
+ && (t.getCoordinator().id == 0 ||
t.getCoordinator().id == coordinateBeId)))
.limit(limit)
.forEach(t -> txnInfos.add(Pair.of(t.getDbId(),
t.getTransactionId())));
} finally {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index ae1a467205f..719999b8094 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -566,20 +566,35 @@ public class GlobalTransactionMgr implements
GlobalTransactionMgrIface {
dbTransactionMgr.updateDatabaseUsedQuotaData(usedQuotaDataBytes);
}
+ @Override
+ public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String
coordinateHost, long beStartTime) {
+ List<Pair<Long, Long>> transactionIdByCoordinateBe
+ = getPrepareTransactionIdByCoordinateBe(coordinateBeId,
coordinateHost, Integer.MAX_VALUE);
+ for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
+ try {
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactionMgr(txnInfo.first);
+ TransactionState transactionState =
dbTransactionMgr.getTransactionState(txnInfo.second);
+ long coordStartTime =
transactionState.getCoordinator().startTime;
+ if (coordStartTime > 0 && coordStartTime < beStartTime) {
+ dbTransactionMgr.abortTransaction(txnInfo.second,
"coordinate BE restart", null);
+ }
+ } catch (UserException e) {
+ LOG.warn("Abort txn on coordinate BE {} failed, msg={}",
coordinateHost, e.getMessage());
+ }
+ }
+ }
+
/**
* If a Coordinate BE is down when running txn, the txn will remain in FE
until killed by timeout
* So when FE identify the Coordinate BE is down, FE should cancel it
initiative
*/
@Override
- public void abortTxnWhenCoordinateBeDown(String coordinateHost, int limit)
{
- List<Pair<Long, Long>> transactionIdByCoordinateBe =
getTransactionIdByCoordinateBe(coordinateHost, limit);
+ public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String
coordinateHost, int limit) {
+ List<Pair<Long, Long>> transactionIdByCoordinateBe
+ = getPrepareTransactionIdByCoordinateBe(coordinateBeId,
coordinateHost, limit);
for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
try {
DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactionMgr(txnInfo.first);
- TransactionState transactionState =
dbTransactionMgr.getTransactionState(txnInfo.second);
- if (transactionState.getTransactionStatus() ==
TransactionStatus.PRECOMMITTED) {
- continue;
- }
dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate
BE is down", null);
} catch (UserException e) {
LOG.warn("Abort txn on coordinate BE {} failed, msg={}",
coordinateHost, e.getMessage());
@@ -763,11 +778,12 @@ public class GlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
}
- @Deprecated
- protected List<Pair<Long, Long>> getTransactionIdByCoordinateBe(String
coordinateHost, int limit) {
+ protected List<Pair<Long, Long>>
getPrepareTransactionIdByCoordinateBe(long coordinateBeId,
+ String coordinateHost, int limit) {
ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>();
for (DatabaseTransactionMgr databaseTransactionMgr :
dbIdToDatabaseTransactionMgrs.values()) {
-
txnInfos.addAll(databaseTransactionMgr.getTransactionIdByCoordinateBe(coordinateHost,
limit));
+
txnInfos.addAll(databaseTransactionMgr.getPrepareTransactionIdByCoordinateBe(
+ coordinateBeId, coordinateHost, limit));
if (txnInfos.size() > limit) {
break;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
index c8c9ff38037..6fd32c8ee7b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
@@ -134,7 +134,9 @@ public interface GlobalTransactionMgrIface extends Writable
{
public void updateDatabaseUsedQuotaData(long dbId, long
usedQuotaDataBytes) throws AnalysisException;
- public void abortTxnWhenCoordinateBeDown(String coordinateHost, int limit);
+ public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String
coordinateHost, long beStartTime);
+
+ public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String
coordinateHost, int limit);
public TransactionStatus getLabelState(long dbId, String label) throws
AnalysisException;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index da28dd8250f..85b7ace42ee 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -38,6 +38,7 @@ import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.qe.MasterOpExecutor;
import org.apache.doris.qe.MasterTxnExecutor;
import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TLoadTxnBeginRequest;
@@ -203,7 +204,9 @@ public class TransactionEntry {
if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) {
this.transactionId =
Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), Lists.newArrayList(table.getId()),
label,
- new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
+ new TxnCoordinator(TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(),
+ ExecuteEnv.getInstance().getStartupTime()),
LoadJobSourceType.INSERT_STREAMING, timeoutSecond);
} else {
String token =
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index f628c945b6c..f03457b0f09 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -166,8 +166,14 @@ public class TransactionState implements Writable {
public static class TxnCoordinator {
@SerializedName(value = "sourceType")
public TxnSourceType sourceType;
+ // backendId for backend, 0 for frontend
+ @SerializedName(value = "id")
+ public long id = 0;
@SerializedName(value = "ip")
public String ip;
+ // frontend/backend start time
+ @SerializedName(value = "st")
+ public long startTime = 0;
// True if this txn if created by system(such as writing data to audit
table)
@SerializedName(value = "ii")
public boolean isFromInternal = false;
@@ -175,9 +181,11 @@ public class TransactionState implements Writable {
public TxnCoordinator() {
}
- public TxnCoordinator(TxnSourceType sourceType, String ip) {
+ public TxnCoordinator(TxnSourceType sourceType, long id, String ip,
long startTime) {
this.sourceType = sourceType;
+ this.id = id;
this.ip = ip;
+ this.startTime = startTime;
}
@Override
@@ -319,7 +327,8 @@ public class TransactionState implements Writable {
this.transactionId = -1;
this.label = "";
this.idToTableCommitInfos = Maps.newHashMap();
- this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE,
"127.0.0.1"); // mocked, to avoid NPE
+ // mocked, to avoid NPE
+ this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, 0,
"127.0.0.1", System.currentTimeMillis());
this.transactionStatus = TransactionStatus.PREPARE;
this.sourceType = LoadJobSourceType.FRONTEND;
this.prepareTime = -1;
@@ -758,7 +767,7 @@ public class TransactionState implements Writable {
TableCommitInfo info = TableCommitInfo.read(in);
idToTableCommitInfos.put(info.getTableId(), info);
}
- txnCoordinator = new
TxnCoordinator(TxnSourceType.valueOf(in.readInt()), Text.readString(in));
+ txnCoordinator = new
TxnCoordinator(TxnSourceType.valueOf(in.readInt()), 0, Text.readString(in), 0);
transactionStatus = TransactionStatus.valueOf(in.readInt());
sourceType = LoadJobSourceType.valueOf(in.readInt());
prepareTime = in.readLong();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
index a1aa78d2595..f62078c3050 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
@@ -61,7 +61,8 @@ public class CloudGlobalTransactionMgrTest {
private static GlobalTransactionMgrIface masterTransMgr;
private static Env masterEnv;
- private TransactionState.TxnCoordinator transactionSource = new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe");
+ private TransactionState.TxnCoordinator transactionSource = new
TransactionState.TxnCoordinator(
+ TransactionState.TxnSourceType.FE, 0, "localfe",
System.currentTimeMillis());
@Before
public void setUp() throws InstantiationException, IllegalAccessException,
IllegalArgumentException,
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index c20464f089d..74121e2d51b 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -67,8 +67,8 @@ public class DatabaseTransactionMgrTest {
private static Env slaveEnv;
private static Map<String, Long> LabelToTxnId;
- private TransactionState.TxnCoordinator transactionSource =
- new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe");
+ private TransactionState.TxnCoordinator transactionSource = new
TransactionState.TxnCoordinator(
+ TransactionState.TxnSourceType.FE, 0, "localfe",
System.currentTimeMillis());
public static void setTransactionFinishPublish(TransactionState
transactionState, List<Long> backendIds) {
if (transactionState.getSubTransactionStates() != null) {
@@ -141,7 +141,8 @@ public class DatabaseTransactionMgrTest {
labelToTxnId.put(CatalogTestUtil.testTxnLabel1, transactionId1);
// txn 2, 3, 4
- TransactionState.TxnCoordinator beTransactionSource = new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.BE, "be1");
+ TransactionState.TxnCoordinator beTransactionSource = new
TransactionState.TxnCoordinator(
+ TransactionState.TxnSourceType.BE, 0, "be1",
System.currentTimeMillis());
long transactionId2 =
masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLabel2,
beTransactionSource,
@@ -221,7 +222,7 @@ public class DatabaseTransactionMgrTest {
@Test
public void testGetTransactionIdByCoordinateBe() throws UserException {
DatabaseTransactionMgr masterDbTransMgr =
masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1);
- List<Pair<Long, Long>> transactionInfoList =
masterDbTransMgr.getTransactionIdByCoordinateBe("be1", 10);
+ List<Pair<Long, Long>> transactionInfoList =
masterDbTransMgr.getPrepareTransactionIdByCoordinateBe(0, "be1", 10);
Assert.assertEquals(3, transactionInfoList.size());
Assert.assertEquals(CatalogTestUtil.testDbId1,
transactionInfoList.get(0).first.longValue());
Assert.assertEquals(TransactionStatus.PREPARE,
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index a2826745021..a5b2b2115eb 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -88,7 +88,8 @@ public class GlobalTransactionMgrTest {
private static Env masterEnv;
private static Env slaveEnv;
- private TransactionState.TxnCoordinator transactionSource = new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe");
+ private TransactionState.TxnCoordinator transactionSource = new
TransactionState.TxnCoordinator(
+ TransactionState.TxnSourceType.FE, 0, "localfe",
System.currentTimeMillis());
protected static List<Long> allBackends =
Lists.newArrayList(CatalogTestUtil.testBackendId1,
CatalogTestUtil.testBackendId2, CatalogTestUtil.testBackendId3);
@@ -302,7 +303,9 @@ public class GlobalTransactionMgrTest {
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
TransactionState transactionState = new TransactionState(1L,
Lists.newArrayList(1L), 1L, "label", null,
- LoadJobSourceType.ROUTINE_LOAD_TASK, new
TxnCoordinator(TxnSourceType.BE, "be1"), routineLoadJob.getId(),
+ LoadJobSourceType.ROUTINE_LOAD_TASK,
+ new TxnCoordinator(TxnSourceType.BE, 0, "be1",
System.currentTimeMillis()),
+ routineLoadJob.getId(),
Config.stream_load_default_timeout_second);
transactionState.setTransactionStatus(TransactionStatus.PREPARE);
masterTransMgr.getCallbackFactory().addCallback(routineLoadJob);
@@ -366,7 +369,9 @@ public class GlobalTransactionMgrTest {
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
TransactionState transactionState = new TransactionState(1L,
Lists.newArrayList(1L), 1L, "label", null,
- LoadJobSourceType.ROUTINE_LOAD_TASK, new
TxnCoordinator(TxnSourceType.BE, "be1"), routineLoadJob.getId(),
+ LoadJobSourceType.ROUTINE_LOAD_TASK,
+ new TxnCoordinator(TxnSourceType.BE, 0, "be1",
System.currentTimeMillis()),
+ routineLoadJob.getId(),
Config.stream_load_default_timeout_second);
transactionState.setTransactionStatus(TransactionStatus.PREPARE);
masterTransMgr.getCallbackFactory().addCallback(routineLoadJob);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
index 2c038aaff37..55faca2a789 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
@@ -88,8 +88,9 @@ public class TransactionStateTest {
UUID uuid = UUID.randomUUID();
TransactionState transactionState = new TransactionState(1000L,
Lists.newArrayList(20000L, 20001L),
3000, "label123", new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits()),
- LoadJobSourceType.BACKEND_STREAMING, new
TxnCoordinator(TxnSourceType.BE, "127.0.0.1"), 50000L,
- 60 * 1000L);
+ LoadJobSourceType.BACKEND_STREAMING,
+ new TxnCoordinator(TxnSourceType.BE, 0, "127.0.0.1",
System.currentTimeMillis()),
+ 50000L, 60 * 1000L);
testSerDe(fileName, transactionState, readTransactionState -> {
Assert.assertEquals(transactionState.getCoordinator().ip,
readTransactionState.getCoordinator().ip);
});
@@ -112,7 +113,8 @@ public class TransactionStateTest {
// TransactionState
TransactionState transactionState = new TransactionState(1000L,
Lists.newArrayList(20000L, 20001L), 3000,
"label123", new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits()),
- LoadJobSourceType.BACKEND_STREAMING, new
TxnCoordinator(TxnSourceType.BE, "127.0.0.1"),
+ LoadJobSourceType.BACKEND_STREAMING,
+ new TxnCoordinator(TxnSourceType.BE, 0, "127.0.0.1",
System.currentTimeMillis()),
TransactionStatus.COMMITTED, "", 100, 50000L,
loadJobFinalOperation, 100, 200, 300, 400);
// check
testSerDe(fileName2, transactionState, readTransactionState -> {
@@ -144,7 +146,8 @@ public class TransactionStateTest {
// TransactionState
TransactionState transactionState = new TransactionState(1000L,
Lists.newArrayList(20000L, 20001L),
3000, "label123", new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits()),
- LoadJobSourceType.BACKEND_STREAMING, new
TxnCoordinator(TxnSourceType.BE, "127.0.0.1"),
+ LoadJobSourceType.BACKEND_STREAMING,
+ new TxnCoordinator(TxnSourceType.BE, 0, "127.0.0.1",
System.currentTimeMillis()),
TransactionStatus.COMMITTED, "", 100, 50000L,
attachment, 100, 200, 300, 400);
// check
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index e1c3c9be5ab..f285ad3f260 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -270,6 +270,8 @@ enum TxnStatusPB {
message TxnCoordinatorPB {
optional TxnSourceTypePB sourceType = 1;
optional string ip = 2;
+ optional int64 id = 3;
+ optional int64 start_time = 4;
}
message RoutineLoadProgressPB {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 556443a3d09..7c563a97272 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -614,6 +614,7 @@ struct TLoadTxnBeginRequest {
12: optional string token
13: optional string auth_code_uuid
14: optional i64 table_id
+ 15: optional i64 backend_id
}
struct TLoadTxnBeginResult {
@@ -636,6 +637,7 @@ struct TBeginTxnRequest {
9: optional i64 timeout
10: optional Types.TUniqueId request_id
11: optional string token
+ 12: optional i64 backend_id
}
struct TBeginTxnResult {
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 8b408bb7f2f..9b5090bec92 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -902,6 +902,33 @@ class Suite implements GroovyInterceptable {
return hdfs.downLoad(label)
}
+ void runStreamLoadExample(String tableName, String coordidateBeHostPort =
"") {
+ def backends = sql_return_maparray "show backends"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ id int,
+ name varchar(255)
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "${backends.size()}"
+ )
+ """
+
+ streamLoad {
+ table tableName
+ set 'column_separator', ','
+ file context.config.dataPath + "/demo_p0/streamload_input.csv"
+ time 10000
+ if (!coordidateBeHostPort.equals("")) {
+ def pos = coordidateBeHostPort.indexOf(':')
+ def host = coordidateBeHostPort.substring(0, pos)
+ def httpPort = coordidateBeHostPort.substring(pos +
1).toInteger()
+ directToBe host, httpPort
+ }
+ }
+ }
+
void streamLoad(Closure actionSupplier) {
runAction(new StreamLoadAction(context), actionSupplier)
}
diff --git a/regression-test/suites/demo_p0/streamLoad_action.groovy
b/regression-test/suites/demo_p0/streamLoad_action.groovy
index 733483517d0..59d12c965a2 100644
--- a/regression-test/suites/demo_p0/streamLoad_action.groovy
+++ b/regression-test/suites/demo_p0/streamLoad_action.groovy
@@ -126,6 +126,11 @@ suite("streamLoad_action") {
LIMIT 5;
"""
+ def tableName2 = "test_streamload_action2"
+ runStreamLoadExample(tableName2)
+
sql """ DROP TABLE ${tableName} """
+ sql """ DROP TABLE ${tableName2}"""
+
sql """ DROP TABLE B """
}
diff --git
a/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy
b/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy
new file mode 100644
index 00000000000..bb6b0c18a0d
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.http.NoHttpResponseException
+
+suite('test_coordidator_be_restart') {
+ def options = new ClusterOptions()
+ options.cloudMode = false
+ options.enableDebugPoints()
+
+ docker(options) {
+ def db = context.config.getDbNameByFile(context.file)
+ def tableName1 = 'tbl_test_coordidator_be_restart_t1'
+ setFeConfig('abort_txn_after_lost_heartbeat_time_second', 3600)
+
+ def dbId = getDbId()
+
+ def tableName2 = 'tbl_test_coordidator_be_restart_t2'
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName2} (
+ id int,
+ name CHAR(10),
+ dt_1 DATETIME DEFAULT CURRENT_TIMESTAMP,
+ dt_2 DATETIMEV2 DEFAULT CURRENT_TIMESTAMP,
+ dt_3 DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP,
+ dt_4 DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ """
+
+ def txns = sql_return_maparray "show proc
'/transactions/${dbId}/running'"
+ assertEquals(0, txns.size())
+ txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'"
+ assertEquals(0, txns.size())
+
+ def coordinatorBe = cluster.getAllBackends().get(0)
+ def coordinatorBeHost = coordinatorBe.host
+
+
GetDebugPoint().enableDebugPointForAllFEs('LoadAction.selectRedirectBackend.backendId',
[value: coordinatorBe.backendId])
+
GetDebugPoint().enableDebugPointForAllBEs('StreamLoadExecutor.commit_txn.block')
+
+ thread {
+ try {
+ runStreamLoadExample(tableName1, coordinatorBe.host + ':' +
coordinatorBe.httpPort)
+ } catch (NoHttpResponseException t) {
+ // be down will raise NoHttpResponseException
+ }
+ }
+
+ thread {
+ try {
+ streamLoad {
+ set 'version', '1'
+ set 'sql', """
+ insert into ${db}.${tableName2} (id, name) select
c1, c2 from http_stream("format"="csv")
+ """
+ time 120 * 1000
+ file context.config.dataPath +
'/load_p0/http_stream/test_http_stream.csv'
+ }
+ } catch (Exception e) {
+ logger.info('http stream: ' + e)
+ }
+ }
+
+ sleep(5000)
+ txns = sql_return_maparray "show proc '/transactions/${dbId}/running'"
+ logger.info('running txns: ' + txns)
+ assertEquals(2, txns.size())
+ for (def txn : txns) {
+ assertEquals('PREPARE', txn.TransactionStatus)
+ }
+
+ txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'"
+ assertEquals(0, txns.size())
+
+ // coordinatorBe shutdown not abort txn because
abort_txn_after_lost_heartbeat_time_second = 3600
+ cluster.stopBackends(coordinatorBe.index)
+ def isDead = false
+ for (def i = 0; i < 10; i++) {
+ def be = sql_return_maparray('show backends').find { it.Host ==
coordinatorBeHost }
+ if (!be.Alive.toBoolean()) {
+ isDead = true
+ break
+ }
+ sleep 1000
+ }
+ assertTrue(isDead)
+ sleep 5000
+ txns = sql_return_maparray "show proc '/transactions/${dbId}/running'"
+ logger.info('running txns: ' + txns)
+ assertEquals(2, txns.size())
+ for (def txn : txns) {
+ assertEquals('PREPARE', txn.TransactionStatus)
+ }
+
+ // coordinatorBe restart, abort txn on it
+ cluster.startBackends(coordinatorBe.index)
+ def isAlive = false
+ for (def i = 0; i < 20; i++) {
+ def be = sql_return_maparray('show backends').find { it.Host ==
coordinatorBeHost }
+ if (be.Alive.toBoolean()) {
+ isAlive = true
+ break
+ }
+ sleep 1000
+ }
+ assertTrue(isAlive)
+ sleep 5000
+ txns = sql_return_maparray "show proc '/transactions/${dbId}/running'"
+ logger.info('running txns: ' + txns)
+ assertEquals(0, txns.size())
+ txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'"
+ logger.info('finished txns: ' + txns)
+ assertEquals(2, txns.size())
+ for (def txn : txns) {
+ assertEquals('ABORTED', txn.TransactionStatus)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]