This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new eaffb691d38 [branch-2.0](txn) be dead exceeds 5min abort its txns
(#22781, #28662, #35342) (#39317)
eaffb691d38 is described below
commit eaffb691d38afe0995f073847572564df49b909e
Author: yujun <[email protected]>
AuthorDate: Wed Aug 14 15:31:51 2024 +0800
[branch-2.0](txn) be dead exceeds 5min abort its txns (#22781, #28662,
#35342) (#39317)
cherry-pick: #22781, #28662, #35342
---------
Co-authored-by: HHoflittlefish777
<[email protected]>
---
.../runtime/stream_load/stream_load_executor.cpp | 4 +
docs/en/docs/admin-manual/config/fe-config.md | 10 ++
docs/zh-CN/docs/admin-manual/config/fe-config.md | 10 ++
.../main/java/org/apache/doris/common/Config.java | 14 +++
.../apache/doris/analysis/NativeInsertStmt.java | 5 +-
.../java/org/apache/doris/common/ClientPool.java | 2 +-
.../java/org/apache/doris/common/FeConstants.java | 1 -
.../org/apache/doris/httpv2/rest/LoadAction.java | 6 ++
.../java/org/apache/doris/load/DeleteHandler.java | 5 +-
.../apache/doris/load/loadv2/BrokerLoadJob.java | 5 +-
.../org/apache/doris/load/loadv2/SparkLoadJob.java | 5 +-
.../load/routineload/RoutineLoadTaskInfo.java | 5 +-
.../doris/load/sync/canal/CanalSyncChannel.java | 7 +-
.../org/apache/doris/nereids/txn/Transaction.java | 5 +-
.../java/org/apache/doris/qe/SimpleScheduler.java | 3 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 8 +-
.../java/org/apache/doris/service/ExecuteEnv.java | 7 ++
.../apache/doris/service/FrontendServiceImpl.java | 12 ++-
.../java/org/apache/doris/system/HeartbeatMgr.java | 18 +++-
.../doris/transaction/DatabaseTransactionMgr.java | 7 +-
.../doris/transaction/GlobalTransactionMgr.java | 32 +++++--
.../apache/doris/transaction/TransactionState.java | 15 ++-
.../apache/doris/clone/BeDownCancelCloneTest.java | 4 +-
.../doris/cluster/DecommissionBackendTest.java | 2 +-
.../org/apache/doris/qe/SimpleSchedulerTest.java | 8 +-
.../transaction/DatabaseTransactionMgrTest.java | 10 +-
.../transaction/GlobalTransactionMgrTest.java | 11 ++-
.../doris/transaction/TransactionStateTest.java | 5 +-
.../apache/doris/utframe/TestWithFeService.java | 2 +-
gensrc/thrift/FrontendService.thrift | 2 +
.../org/apache/doris/regression/suite/Suite.groovy | 62 ++++++++++++
.../suites/demo_p0/streamLoad_action.groovy | 5 +
.../stream_load/test_coordidator_be_restart.groovy | 106 +++++++++++++++++++++
33 files changed, 349 insertions(+), 54 deletions(-)
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 4a80eb35ca6..19d25e9ffa1 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -45,6 +45,7 @@
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
#include "thrift/protocol/TDebugProtocol.h"
+#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/thrift_rpc_helper.h"
#include "util/time.h"
@@ -242,6 +243,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext*
ctx) {
request.__set_timeout(ctx->timeout_second);
}
request.__set_request_id(ctx->id.to_thrift());
+ request.__set_backend_id(_exec_env->master_info()->backend_id);
TLoadTxnBeginResult result;
Status status;
@@ -374,6 +376,8 @@ void
StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx,
}
Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
+ DBUG_EXECUTE_IF("StreamLoadExecutor.commit_txn.block", DBUG_BLOCK);
+
DorisMetrics::instance()->stream_load_txn_commit_request_total->increment(1);
TLoadTxnCommitRequest request;
diff --git a/docs/en/docs/admin-manual/config/fe-config.md
b/docs/en/docs/admin-manual/config/fe-config.md
index 1c2ab9939a1..fbab5cfcd15 100644
--- a/docs/en/docs/admin-manual/config/fe-config.md
+++ b/docs/en/docs/admin-manual/config/fe-config.md
@@ -587,6 +587,16 @@ Is it possible to configure dynamically: true
Whether it is a configuration item unique to the Master FE node: true
+### `abort_txn_after_lost_heartbeat_time_second`
+
+Abort transaction time after lost heartbeat. The default value is 300, which
means transactions of be will be aborted after lost heartbeat 300s.
+
+Default: 300(s)
+
+Is it possible to configure dynamically: true
+
+Whether it is a configuration item unique to the Master FE node: true
+
#### `enable_access_file_without_broker`
Default:false
diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md
b/docs/zh-CN/docs/admin-manual/config/fe-config.md
index c93b2d17e01..60e2198b711 100644
--- a/docs/zh-CN/docs/admin-manual/config/fe-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md
@@ -587,6 +587,16 @@ FE向BE的BackendService发送rpc请求时的超时时间,单位:毫秒。
是否为 Master FE 节点独有的配置项:true
+#### `abort_txn_after_lost_heartbeat_time_second`
+
+丢失be心跳后丢弃be事务的时间。默认时间为三百秒,当三百秒fe没有接收到be心跳时,会丢弃该be的所有事务。
+
+默认值:300(秒)
+
+是否可以动态配置:true
+
+是否为 Master FE 节点独有的配置项:true
+
#### `enable_access_file_without_broker`
默认值:false
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index a5c0ad36ad8..44882f21768 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1812,6 +1812,20 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static long max_backend_heartbeat_failure_tolerance_count = 1;
+ /**
+ * Abort transaction time after lost heartbeat.
+ * The default value is 300s, which means transactions of be will be
aborted after lost heartbeat 300s.
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static int abort_txn_after_lost_heartbeat_time_second = 300;
+
+ /**
+ * Heartbeat interval in seconds.
+ * Default is 5, which means every 5 seconds, the master will send a
heartbeat to all backends.
+ */
+ @ConfField(mutable = false, masterOnly = false)
+ public static int heartbeat_interval_second = 5;
+
/**
* The iceberg and hudi table will be removed in v1.3
* Use multi catalog instead.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 9625db3ea5c..f95efcd3474 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -53,6 +53,7 @@ import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.external.jdbc.JdbcTableSink;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.ExprRewriter;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TUniqueId;
@@ -358,7 +359,9 @@ public class NativeInsertStmt extends InsertStmt {
LoadJobSourceType sourceType =
LoadJobSourceType.INSERT_STREAMING;
transactionId =
Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
Lists.newArrayList(targetTable.getId()),
label.getLabelName(),
- new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
+ new TxnCoordinator(TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(),
+ ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
}
isTransactionBegin = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
index 7308a225402..bdfffbe4802 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
@@ -27,7 +27,7 @@ import
org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
public class ClientPool {
static GenericKeyedObjectPoolConfig heartbeatConfig = new
GenericKeyedObjectPoolConfig();
- static int heartbeatTimeoutMs = FeConstants.heartbeat_interval_second *
1000;
+ static int heartbeatTimeoutMs = Config.heartbeat_interval_second * 1000;
static GenericKeyedObjectPoolConfig backendConfig = new
GenericKeyedObjectPoolConfig();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 1afa12856f0..2f45d87e895 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -34,7 +34,6 @@ public class FeConstants {
public static int shortkey_max_column_count = 3;
public static int shortkey_maxsize_bytes = 36;
- public static int heartbeat_interval_second = 5;
public static int checkpoint_interval_second = 60; // 1 minutes
// dpp version
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index b358ea60b9a..c021c62f367 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -22,6 +22,7 @@ import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.httpv2.exception.UnauthorizedException;
@@ -229,6 +230,11 @@ public class LoadAction extends RestBaseController {
}
private TNetworkAddress selectRedirectBackend(String clusterName) throws
LoadException {
+ long debugBackendId =
DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId",
-1L);
+ if (debugBackendId != -1L) {
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(debugBackendId);
+ return new TNetworkAddress(backend.getHost(),
backend.getHttpPort());
+ }
String qualifiedUser = ConnectContext.get().getQualifiedUser();
Set<Tag> userTags =
Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
index b771eb8c9fa..70b3765227a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
@@ -69,6 +69,7 @@ import org.apache.doris.planner.RangePartitionPrunerV2;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
@@ -245,7 +246,9 @@ public class DeleteHandler implements Writable {
// begin txn here and generate txn id
transactionId =
Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
Lists.newArrayList(olapTable.getId()), label, null,
- new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
+ new TxnCoordinator(TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(),
+ ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.FRONTEND, jobId,
Config.stream_load_default_timeout_second);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 656b9b9d699..4591bb91b03 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -49,6 +49,7 @@ import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
@@ -104,7 +105,9 @@ public class BrokerLoadJob extends BulkLoadJob {
QuotaExceedException, MetaNotFoundException {
transactionId = Env.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId,
Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
- new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
+ new TxnCoordinator(TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(),
+ ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id,
getTimeout());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 09a85d3dffe..878a400e281 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -61,6 +61,7 @@ import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.sparkdpp.DppResult;
import org.apache.doris.sparkdpp.EtlJobConfig;
@@ -198,7 +199,9 @@ public class SparkLoadJob extends BulkLoadJob {
QuotaExceedException, MetaNotFoundException {
transactionId = Env.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId,
Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
- new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
+ new TxnCoordinator(TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(),
+ ExecuteEnv.getInstance().getStartupTime()),
LoadJobSourceType.FRONTEND, id, getTimeout());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 10d57e66d67..ae2570224c4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.transaction.BeginTransactionException;
@@ -206,7 +207,9 @@ public abstract class RoutineLoadTaskInfo {
try {
txnId =
Env.getCurrentGlobalTransactionMgr().beginTransaction(routineLoadJob.getDbId(),
Lists.newArrayList(routineLoadJob.getTableId()),
DebugUtil.printId(id), null,
- new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
+ new TxnCoordinator(TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(),
+ ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK,
routineLoadJob.getId(),
timeoutMs / 1000);
} catch (DuplicatedRequestException e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
index 5b10ecea818..825d63292ef 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
@@ -32,6 +32,7 @@ import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.load.sync.model.Data;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.InsertStreamTxnExecutor;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.SyncTask;
import org.apache.doris.task.SyncTaskPool;
@@ -133,8 +134,10 @@ public class CanalSyncChannel extends SyncChannel {
try {
long txnId =
globalTransactionMgr.beginTransaction(db.getId(),
Lists.newArrayList(tbl.getId()), label,
- new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE,
- FrontendOptions.getLocalHostAddress()),
sourceType, timeoutSecond);
+ new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(),
+ ExecuteEnv.getInstance().getStartupTime()),
+ sourceType, timeoutSecond);
String token =
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
request = new TStreamLoadPutRequest()
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java
index 994f5e36055..db6598a1e1a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java
@@ -34,6 +34,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TQueryType;
@@ -80,7 +81,9 @@ public class Transaction {
this.coordinator = new Coordinator(ctx, null, planner,
ctx.getStatsErrorEstimator());
this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), ImmutableList.of(table.getId()), labelName,
- new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
+ new TxnCoordinator(TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(),
+ ExecuteEnv.getInstance().getStartupTime()),
LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout());
this.createAt = System.currentTimeMillis();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
index ace88c8ac0d..48c7744576c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
@@ -19,7 +19,6 @@ package org.apache.doris.qe;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Reference;
import org.apache.doris.common.UserException;
@@ -176,7 +175,7 @@ public class SimpleScheduler {
return;
}
- blacklistBackends.put(backendID,
Pair.of(FeConstants.heartbeat_interval_second + 1, reason));
+ blacklistBackends.put(backendID,
Pair.of(Config.heartbeat_interval_second + 1, reason));
LOG.warn("add backend {} to black list. reason: {}", backendID,
reason);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 8dda65e81f7..f47121348f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -160,6 +160,7 @@ import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
+import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.util.InternalQueryBuffer;
@@ -1908,9 +1909,10 @@ public class StmtExecutor {
String label = txnEntry.getLabel();
if (Env.getCurrentEnv().isMaster()) {
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
- txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
- label, new TransactionState.TxnCoordinator(
- TransactionState.TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
+ txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
label,
+ new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(),
+ ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
txnConf.setTxnId(txnId);
String token =
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java
b/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java
index a7ac522b5bd..ecd544c8bff 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java
@@ -26,10 +26,12 @@ public class ExecuteEnv {
private static volatile ExecuteEnv INSTANCE;
private MultiLoadMgr multiLoadMgr;
private ConnectScheduler scheduler;
+ private long startupTime;
private ExecuteEnv() {
multiLoadMgr = new MultiLoadMgr();
scheduler = new ConnectScheduler(Config.qe_max_connection);
+ startupTime = System.currentTimeMillis();
}
public static ExecuteEnv getInstance() {
@@ -50,4 +52,9 @@ public class ExecuteEnv {
public MultiLoadMgr getMultiLoadMgr() {
return multiLoadMgr;
}
+
+ public long getStartupTime() {
+ return startupTime;
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 36fdec157ac..a38921221fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1249,10 +1249,12 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl,
TableType.OLAP);
// begin
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() :
Config.stream_load_default_timeout_second;
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(request.getBackendId());
+ long startTime = backend != null ? backend.getLastStartTime() : 0;
+ TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE,
request.getBackendId(), clientIp, startTime);
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
db.getId(), Lists.newArrayList(table.getId()),
request.getLabel(), request.getRequestId(),
- new TxnCoordinator(TxnSourceType.BE, clientIp),
- TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1,
timeoutSecond);
+ txnCoord,
TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);
TLoadTxnBeginResult result = new TLoadTxnBeginResult();
result.setTxnId(txnId).setDbId(db.getId());
return result;
@@ -1356,10 +1358,12 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// step 5: get timeout
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() :
Config.stream_load_default_timeout_second;
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(request.getBackendId());
+ long startTime = backend != null ? backend.getLastStartTime() : 0;
+ TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE,
request.getBackendId(), clientIp, startTime);
// step 6: begin transaction
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
- db.getId(), tableIdList, request.getLabel(),
request.getRequestId(),
- new TxnCoordinator(TxnSourceType.BE, clientIp),
+ db.getId(), tableIdList, request.getLabel(),
request.getRequestId(), txnCoord,
TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1,
timeoutSecond);
// step 7: return result
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 7c081c12cd0..f621ae1e322 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -77,7 +77,7 @@ public class HeartbeatMgr extends MasterDaemon {
private static volatile AtomicReference<TMasterInfo> masterInfo = new
AtomicReference<>();
public HeartbeatMgr(SystemInfoService nodeMgr, boolean needRegisterMetric)
{
- super("heartbeat mgr", FeConstants.heartbeat_interval_second * 1000);
+ super("heartbeat mgr", Config.heartbeat_interval_second * 1000);
this.nodeMgr = nodeMgr;
this.executor =
ThreadPoolManager.newDaemonFixedThreadPool(Config.heartbeat_mgr_threads_num,
Config.heartbeat_mgr_blocking_queue_size,
"heartbeat-mgr-pool", needRegisterMetric);
@@ -168,13 +168,21 @@ public class HeartbeatMgr extends MasterDaemon {
BackendHbResponse hbResponse = (BackendHbResponse) response;
Backend be = nodeMgr.getBackend(hbResponse.getBeId());
if (be != null) {
+ long oldStartTime = be.getLastStartTime();
boolean isChanged = be.handleHbResponse(hbResponse,
isReplay);
- if (hbResponse.getStatus() != HbStatus.OK) {
+ if (hbResponse.getStatus() == HbStatus.OK) {
+ long newStartTime = be.getLastStartTime();
+ if (!isReplay && oldStartTime != newStartTime) {
+
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart(
+ be.getId(), be.getHost(), newStartTime);
+ }
+ } else {
// invalid all connections cached in ClientPool
ClientPool.backendPool.clearPool(new
TNetworkAddress(be.getHost(), be.getBePort()));
- if (!isReplay) {
- Env.getCurrentEnv().getGlobalTransactionMgr()
-
.abortTxnWhenCoordinateBeDown(be.getHost(), 100);
+ if (!isReplay && System.currentTimeMillis() -
be.getLastUpdateMs()
+ >=
Config.abort_txn_after_lost_heartbeat_time_second * 1000L) {
+
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeDown(
+ be.getId(), be.getHost(), 100);
}
}
return isChanged;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 7ef043136ba..d733d863f4e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -1733,13 +1733,16 @@ public class DatabaseTransactionMgr {
return null;
}
- public List<Pair<Long, Long>> getTransactionIdByCoordinateBe(String
coordinateHost, int limit) {
+ public List<Pair<Long, Long>> getPrepareTransactionIdByCoordinateBe(long
coordinateBeId,
+ String coordinateHost, int limit) {
ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>();
readLock();
try {
idToRunningTransactionState.values().stream()
.filter(t -> (t.getCoordinator().sourceType ==
TransactionState.TxnSourceType.BE
- && t.getCoordinator().ip.equals(coordinateHost)))
+ && t.getTransactionStatus() ==
TransactionStatus.PREPARE
+ && t.getCoordinator().ip.equals(coordinateHost)
+ && (t.getCoordinator().id == 0 ||
t.getCoordinator().id == coordinateBeId)))
.limit(limit)
.forEach(t -> txnInfos.add(Pair.of(t.getDbId(),
t.getTransactionId())));
} finally {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 22a019c4c0d..35c2195eb33 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -653,10 +653,12 @@ public class GlobalTransactionMgr implements Writable {
}
}
- public List<Pair<Long, Long>> getTransactionIdByCoordinateBe(String
coordinateHost, int limit) {
+ private List<Pair<Long, Long>> getPrepareTransactionIdByCoordinateBe(long
coordinateBeId,
+ String coordinateHost, int limit) {
ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>();
for (DatabaseTransactionMgr databaseTransactionMgr :
dbIdToDatabaseTransactionMgrs.values()) {
-
txnInfos.addAll(databaseTransactionMgr.getTransactionIdByCoordinateBe(coordinateHost,
limit));
+
txnInfos.addAll(databaseTransactionMgr.getPrepareTransactionIdByCoordinateBe(
+ coordinateBeId, coordinateHost, limit));
if (txnInfos.size() > limit) {
break;
}
@@ -664,19 +666,33 @@ public class GlobalTransactionMgr implements Writable {
return txnInfos.size() > limit ? new ArrayList<>(txnInfos.subList(0,
limit)) : txnInfos;
}
+ public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String
coordinateHost, long beStartTime) {
+ List<Pair<Long, Long>> transactionIdByCoordinateBe
+ = getPrepareTransactionIdByCoordinateBe(coordinateBeId,
coordinateHost, Integer.MAX_VALUE);
+ for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
+ try {
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactionMgr(txnInfo.first);
+ TransactionState transactionState =
dbTransactionMgr.getTransactionState(txnInfo.second);
+ long coordStartTime =
transactionState.getCoordinator().startTime;
+ if (coordStartTime < beStartTime) {
+ dbTransactionMgr.abortTransaction(txnInfo.second,
"coordinate BE restart", null);
+ }
+ } catch (UserException e) {
+ LOG.warn("Abort txn on coordinate BE {} failed, msg={}",
coordinateHost, e.getMessage());
+ }
+ }
+ }
+
/**
* If a Coordinate BE is down when running txn, the txn will remain in FE
until killed by timeout
* So when FE identify the Coordinate BE is down, FE should cancel it
initiative
*/
- public void abortTxnWhenCoordinateBeDown(String coordinateHost, int limit)
{
- List<Pair<Long, Long>> transactionIdByCoordinateBe =
getTransactionIdByCoordinateBe(coordinateHost, limit);
+ public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String
coordinateHost, int limit) {
+ List<Pair<Long, Long>> transactionIdByCoordinateBe
+ = getPrepareTransactionIdByCoordinateBe(coordinateBeId,
coordinateHost, limit);
for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
try {
DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactionMgr(txnInfo.first);
- TransactionState transactionState =
dbTransactionMgr.getTransactionState(txnInfo.second);
- if (transactionState.getTransactionStatus() ==
TransactionStatus.PRECOMMITTED) {
- continue;
- }
dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate
BE is down", null);
} catch (UserException e) {
LOG.warn("Abort txn on coordinate BE {} failed, msg={}",
coordinateHost, e.getMessage());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index f9a094eceb9..cdef27c9359 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -162,15 +162,23 @@ public class TransactionState implements Writable {
public static class TxnCoordinator {
@SerializedName(value = "sourceType")
public TxnSourceType sourceType;
+ // backendId for backend, 0 for frontend
+ @SerializedName(value = "id")
+ public long id = 0;
@SerializedName(value = "ip")
public String ip;
+ // frontend/backend start time
+ @SerializedName(value = "st")
+ public long startTime = 0;
public TxnCoordinator() {
}
- public TxnCoordinator(TxnSourceType sourceType, String ip) {
+ public TxnCoordinator(TxnSourceType sourceType, long id, String ip,
long startTime) {
this.sourceType = sourceType;
+ this.id = id;
this.ip = ip;
+ this.startTime = startTime;
}
@Override
@@ -301,7 +309,8 @@ public class TransactionState implements Writable {
this.transactionId = -1;
this.label = "";
this.idToTableCommitInfos = Maps.newHashMap();
- this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE,
"127.0.0.1"); // mocked, to avoid NPE
+ // mocked, to avoid NPE
+ this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, 0,
"127.0.0.1", System.currentTimeMillis());
this.transactionStatus = TransactionStatus.PREPARE;
this.sourceType = LoadJobSourceType.FRONTEND;
this.prepareTime = -1;
@@ -721,7 +730,7 @@ public class TransactionState implements Writable {
info.readFields(in);
idToTableCommitInfos.put(info.getTableId(), info);
}
- txnCoordinator = new
TxnCoordinator(TxnSourceType.valueOf(in.readInt()), Text.readString(in));
+ txnCoordinator = new
TxnCoordinator(TxnSourceType.valueOf(in.readInt()), 0, Text.readString(in), 0);
transactionStatus = TransactionStatus.valueOf(in.readInt());
sourceType = LoadJobSourceType.valueOf(in.readInt());
prepareTime = in.readLong();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java
index 4a413495e98..e288d046de1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java
@@ -54,7 +54,7 @@ public class BeDownCancelCloneTest extends TestWithFeService {
Config.disable_balance = true;
Config.schedule_batch_size = 1000;
Config.schedule_slot_num_per_hdd_path = 1000;
- FeConstants.heartbeat_interval_second = 5;
+ Config.heartbeat_interval_second = 5;
Config.max_backend_heartbeat_failure_tolerance_count = 1;
Config.min_clone_task_timeout_sec = 20 * 60 * 1000;
}
@@ -114,7 +114,7 @@ public class BeDownCancelCloneTest extends
TestWithFeService {
params2.put("deadBeIds", String.valueOf(destBeId));
DebugPointUtil.addDebugPointWithParams("HeartbeatMgr.BackendHeartbeatHandler",
params2);
- Thread.sleep((FeConstants.heartbeat_interval_second
+ Thread.sleep((Config.heartbeat_interval_second
* Config.max_backend_heartbeat_failure_tolerance_count + 4) *
1000L);
destBe = Env.getCurrentSystemInfo().getBackend(destBeId);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
index b917bc41a7e..ff5f5292be8 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
@@ -66,7 +66,7 @@ public class DecommissionBackendTest extends
TestWithFeService {
Config.disable_balance = true;
Config.schedule_batch_size = 1000;
Config.schedule_slot_num_per_hdd_path = 1000;
- FeConstants.heartbeat_interval_second = 5;
+ Config.heartbeat_interval_second = 5;
}
@Test
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java
index 6ba2d271566..ac13900d2cf 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java
@@ -17,7 +17,7 @@
package org.apache.doris.qe;
-import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Config;
import org.apache.doris.common.Reference;
import org.apache.doris.common.UserException;
import org.apache.doris.system.Backend;
@@ -47,7 +47,7 @@ public class SimpleSchedulerTest {
@BeforeClass
public static void setUp() {
SimpleScheduler.init();
- FeConstants.heartbeat_interval_second = 2;
+ Config.heartbeat_interval_second = 2;
be1 = new Backend(1000L, "192.168.100.0", 9050);
be2 = new Backend(1001L, "192.168.100.1", 9050);
be3 = new Backend(1002L, "192.168.100.2", 9050);
@@ -139,7 +139,7 @@ public class SimpleSchedulerTest {
t3.join();
Assert.assertFalse(SimpleScheduler.isAvailable(be1));
- Thread.sleep((FeConstants.heartbeat_interval_second + 5) * 1000);
+ Thread.sleep((Config.heartbeat_interval_second + 5) * 1000);
Assert.assertTrue(SimpleScheduler.isAvailable(be1));
}
@@ -194,7 +194,7 @@ public class SimpleSchedulerTest {
System.out.println(e.getMessage());
}
- Thread.sleep((FeConstants.heartbeat_interval_second + 5) * 1000);
+ Thread.sleep((Config.heartbeat_interval_second + 5) * 1000);
Assert.assertNotNull(SimpleScheduler.getHost(locations.get(0).backend_id,
locations, backends, ref));
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index 9108570e5e4..ea63a5e18b1 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -57,8 +57,8 @@ public class DatabaseTransactionMgrTest {
private static Env slaveEnv;
private static Map<String, Long> LabelToTxnId;
- private TransactionState.TxnCoordinator transactionSource =
- new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe");
+ private TransactionState.TxnCoordinator transactionSource = new
TransactionState.TxnCoordinator(
+ TransactionState.TxnSourceType.FE, 0, "localfe",
System.currentTimeMillis());
public static void setTransactionFinishPublish(TransactionState
transactionState, List<Long> backendIds) {
for (long backendId : backendIds) {
@@ -118,7 +118,9 @@ public class DatabaseTransactionMgrTest {
masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId1);
labelToTxnId.put(CatalogTestUtil.testTxnLabel1, transactionId1);
- TransactionState.TxnCoordinator beTransactionSource = new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.BE, "be1");
+ // txn 2, 3, 4
+ TransactionState.TxnCoordinator beTransactionSource = new
TransactionState.TxnCoordinator(
+ TransactionState.TxnSourceType.BE, 0, "be1",
System.currentTimeMillis());
long transactionId2 =
masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLabel2,
beTransactionSource,
@@ -204,7 +206,7 @@ public class DatabaseTransactionMgrTest {
@Test
public void testGetTransactionIdByCoordinateBe() throws UserException {
DatabaseTransactionMgr masterDbTransMgr =
masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1);
- List<Pair<Long, Long>> transactionInfoList =
masterDbTransMgr.getTransactionIdByCoordinateBe("be1", 10);
+ List<Pair<Long, Long>> transactionInfoList =
masterDbTransMgr.getPrepareTransactionIdByCoordinateBe(0, "be1", 10);
Assert.assertEquals(3, transactionInfoList.size());
Assert.assertEquals(CatalogTestUtil.testDbId1,
transactionInfoList.get(0).first.longValue());
Assert.assertEquals(TransactionStatus.PREPARE,
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 414f5cf03c4..cc00237a4c4 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -77,7 +77,8 @@ public class GlobalTransactionMgrTest {
private static Env masterEnv;
private static Env slaveEnv;
- private TransactionState.TxnCoordinator transactionSource = new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe");
+ private TransactionState.TxnCoordinator transactionSource = new
TransactionState.TxnCoordinator(
+ TransactionState.TxnSourceType.FE, 0, "localfe",
System.currentTimeMillis());
@Before
public void setUp() throws InstantiationException, IllegalAccessException,
IllegalArgumentException,
@@ -323,7 +324,9 @@ public class GlobalTransactionMgrTest {
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
TransactionState transactionState = new TransactionState(1L,
Lists.newArrayList(1L), 1L, "label", null,
- LoadJobSourceType.ROUTINE_LOAD_TASK, new
TxnCoordinator(TxnSourceType.BE, "be1"), routineLoadJob.getId(),
+ LoadJobSourceType.ROUTINE_LOAD_TASK,
+ new TxnCoordinator(TxnSourceType.BE, 0, "be1",
System.currentTimeMillis()),
+ routineLoadJob.getId(),
Config.stream_load_default_timeout_second);
transactionState.setTransactionStatus(TransactionStatus.PREPARE);
masterTransMgr.getCallbackFactory().addCallback(routineLoadJob);
@@ -395,7 +398,9 @@ public class GlobalTransactionMgrTest {
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
TransactionState transactionState = new TransactionState(1L,
Lists.newArrayList(1L), 1L, "label", null,
- LoadJobSourceType.ROUTINE_LOAD_TASK, new
TxnCoordinator(TxnSourceType.BE, "be1"), routineLoadJob.getId(),
+ LoadJobSourceType.ROUTINE_LOAD_TASK,
+ new TxnCoordinator(TxnSourceType.BE, 0, "be1",
System.currentTimeMillis()),
+ routineLoadJob.getId(),
Config.stream_load_default_timeout_second);
transactionState.setTransactionStatus(TransactionStatus.PREPARE);
masterTransMgr.getCallbackFactory().addCallback(routineLoadJob);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
index c20b2097f8d..f08b7478d06 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
@@ -61,8 +61,9 @@ public class TransactionStateTest {
UUID uuid = UUID.randomUUID();
TransactionState transactionState = new TransactionState(1000L,
Lists.newArrayList(20000L, 20001L),
3000, "label123", new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits()),
- LoadJobSourceType.BACKEND_STREAMING, new
TxnCoordinator(TxnSourceType.BE, "127.0.0.1"), 50000L,
- 60 * 1000L);
+ LoadJobSourceType.BACKEND_STREAMING,
+ new TxnCoordinator(TxnSourceType.BE, 0, "127.0.0.1",
System.currentTimeMillis()),
+ 50000L, 60 * 1000L);
transactionState.write(out);
out.flush();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index c61fcd28afe..59e4eae0a37 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -405,7 +405,7 @@ public abstract class TestWithFeService {
}
private void checkBEHeartbeat(List<Backend> bes) throws
InterruptedException {
- int maxTry = FeConstants.heartbeat_interval_second + 5;
+ int maxTry = Config.heartbeat_interval_second + 5;
boolean allAlive = false;
while (maxTry-- > 0 && !allAlive) {
Thread.sleep(1000);
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index da4c915e9b0..75a6537ee85 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -559,6 +559,7 @@ struct TLoadTxnBeginRequest {
10: optional i64 timeout
11: optional Types.TUniqueId request_id
12: optional string token
+ 13: optional i64 backend_id
}
struct TLoadTxnBeginResult {
@@ -581,6 +582,7 @@ struct TBeginTxnRequest {
9: optional i64 timeout
10: optional Types.TUniqueId request_id
11: optional string token
+ 12: optional i64 backend_id
}
struct TBeginTxnResult {
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index afc23cc2cc0..9a14346af10 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -422,6 +422,41 @@ class Suite implements GroovyInterceptable {
return result;
}
+ long getTableId(String tableName) {
+ return getTableId(getDbName(), tableName)
+ }
+
+ long getTableId(String dbName, String tableName) {
+ def dbInfo = sql "show proc '/dbs'"
+ for(List<Object> row : dbInfo) {
+ if (row[1].replace("default_cluster:", "").equals(dbName)) {
+ def tbInfo = sql "show proc '/dbs/${row[0]}' "
+ for (List<Object> tb : tbInfo) {
+ if (tb[1].equals(tableName)) {
+ return tb[0].toLong()
+ }
+ }
+ }
+ }
+ }
+
+ long getDbId() {
+ return getDbId(getDbName())
+ }
+
+ long getDbId(String dbName) {
+ def dbInfo = sql "show proc '/dbs'"
+ for (List<Object> row : dbInfo) {
+ if (row[1].replace("default_cluster:", "").equals(dbName)) {
+ return row[0].toLong()
+ }
+ }
+ }
+
+ String getDbName() {
+ return context.dbName
+ }
+
List<List<Object>> order_sql(String sqlStr) {
return sql(sqlStr, true)
}
@@ -681,6 +716,33 @@ class Suite implements GroovyInterceptable {
return hdfs.downLoad(label)
}
+ void runStreamLoadExample(String tableName, String coordidateBeHostPort =
"") {
+ def backends = sql_return_maparray "show backends"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ id int,
+ name varchar(255)
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "${backends.size()}"
+ )
+ """
+
+ streamLoad {
+ table tableName
+ set 'column_separator', ','
+ file context.config.dataPath + "/demo_p0/streamload_input.csv"
+ time 10000
+ if (!coordidateBeHostPort.equals("")) {
+ def pos = coordidateBeHostPort.indexOf(':')
+ def host = coordidateBeHostPort.substring(0, pos)
+ def httpPort = coordidateBeHostPort.substring(pos +
1).toInteger()
+ directToBe host, httpPort
+ }
+ }
+ }
+
void streamLoad(Closure actionSupplier) {
runAction(new StreamLoadAction(context), actionSupplier)
}
diff --git a/regression-test/suites/demo_p0/streamLoad_action.groovy
b/regression-test/suites/demo_p0/streamLoad_action.groovy
index a11aed7a1c5..b0ca182c8d9 100644
--- a/regression-test/suites/demo_p0/streamLoad_action.groovy
+++ b/regression-test/suites/demo_p0/streamLoad_action.groovy
@@ -124,6 +124,11 @@ suite("streamLoad_action") {
LIMIT 5;
"""
+ def tableName2 = "test_streamload_action2"
+ runStreamLoadExample(tableName2)
+
sql """ DROP TABLE ${tableName} """
+ sql """ DROP TABLE ${tableName2}"""
+
sql """ DROP TABLE B """
}
diff --git
a/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy
b/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy
new file mode 100644
index 00000000000..cd2b8ea9e48
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.http.NoHttpResponseException
+
+suite('test_coordidator_be_restart') {
+ def options = new ClusterOptions()
+ options.cloudMode = false
+ options.enableDebugPoints()
+
+ docker(options) {
+ def db = context.config.getDbNameByFile(context.file)
+ def tableName1 = 'tbl_test_coordidator_be_restart_t1'
+ setFeConfig('abort_txn_after_lost_heartbeat_time_second', 3600)
+
+ def dbId = getDbId()
+
+ def txns = sql_return_maparray "show proc
'/transactions/${dbId}/running'"
+ assertEquals(0, txns.size())
+ txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'"
+ assertEquals(0, txns.size())
+
+ def coordinatorBe = cluster.getAllBackends().get(0)
+ def coordinatorBeHost = coordinatorBe.host
+
+
GetDebugPoint().enableDebugPointForAllFEs('LoadAction.selectRedirectBackend.backendId',
[value: coordinatorBe.backendId])
+
GetDebugPoint().enableDebugPointForAllBEs('StreamLoadExecutor.commit_txn.block')
+
+ thread {
+ try {
+ runStreamLoadExample(tableName1, coordinatorBe.host + ':' +
coordinatorBe.httpPort)
+ } catch (NoHttpResponseException t) {
+ // be down will raise NoHttpResponseException
+ }
+ }
+
+ sleep(5000)
+ txns = sql_return_maparray "show proc '/transactions/${dbId}/running'"
+ logger.info('running txns: ' + txns)
+ assertEquals(1, txns.size())
+ for (def txn : txns) {
+ assertEquals('PREPARE', txn.TransactionStatus)
+ }
+
+ txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'"
+ assertEquals(0, txns.size())
+
+ // coordinatorBe shutdown not abort txn because
abort_txn_after_lost_heartbeat_time_second = 3600
+ cluster.stopBackends(coordinatorBe.index)
+ def isDead = false
+ for (def i = 0; i < 10; i++) {
+ def be = sql_return_maparray('show backends').find { it.Host ==
coordinatorBeHost }
+ if (!be.Alive.toBoolean()) {
+ isDead = true
+ break
+ }
+ sleep 1000
+ }
+ assertTrue(isDead)
+ sleep 5000
+ txns = sql_return_maparray "show proc '/transactions/${dbId}/running'"
+ logger.info('running txns: ' + txns)
+ assertEquals(1, txns.size())
+ for (def txn : txns) {
+ assertEquals('PREPARE', txn.TransactionStatus)
+ }
+
+ // coordinatorBe restart, abort txn on it
+ cluster.startBackends(coordinatorBe.index)
+ def isAlive = false
+ for (def i = 0; i < 20; i++) {
+ def be = sql_return_maparray('show backends').find { it.Host ==
coordinatorBeHost }
+ if (be.Alive.toBoolean()) {
+ isAlive = true
+ break
+ }
+ sleep 1000
+ }
+ assertTrue(isAlive)
+ sleep 5000
+ txns = sql_return_maparray "show proc '/transactions/${dbId}/running'"
+ logger.info('running txns: ' + txns)
+ assertEquals(0, txns.size())
+ txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'"
+ logger.info('finished txns: ' + txns)
+ assertEquals(1, txns.size())
+ for (def txn : txns) {
+ assertEquals('ABORTED', txn.TransactionStatus)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]