This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new a1aa9b8ab9f [fix](routine load) add read lock to fix some concurrent
bugs (#39242) (#39525)
a1aa9b8ab9f is described below
commit a1aa9b8ab9f242dc81784d7e15972085d53d2ce4
Author: hui lai <[email protected]>
AuthorDate: Mon Aug 19 21:18:27 2024 +0800
[fix](routine load) add read lock to fix some concurrent bugs (#39242)
(#39525)
pick #39242
---
.../doris/load/routineload/RoutineLoadJob.java | 53 +++++++++++++---------
1 file changed, 31 insertions(+), 22 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 5ef531bb37f..d3bd24a0b3a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1566,24 +1566,28 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
public List<List<String>> getTasksShowInfo() throws AnalysisException {
List<List<String>> rows = Lists.newArrayList();
- if (null == routineLoadTaskInfoList ||
routineLoadTaskInfoList.isEmpty()) {
+ readLock();
+ try {
+ if (null == routineLoadTaskInfoList ||
routineLoadTaskInfoList.isEmpty()) {
+ return rows;
+ }
+ routineLoadTaskInfoList.forEach(entity -> {
+ long txnId = entity.getTxnId();
+ if (RoutineLoadTaskInfo.INIT_TXN_ID == txnId) {
+ rows.add(entity.getTaskShowInfo());
+ return;
+ }
+ TransactionState transactionState =
Env.getCurrentGlobalTransactionMgr()
+ .getTransactionState(dbId, entity.getTxnId());
+ if (null != transactionState && null !=
transactionState.getTransactionStatus()) {
+
entity.setTxnStatus(transactionState.getTransactionStatus());
+ }
+ rows.add(entity.getTaskShowInfo());
+ });
return rows;
+ } finally {
+ readUnlock();
}
-
- routineLoadTaskInfoList.forEach(entity -> {
- long txnId = entity.getTxnId();
- if (RoutineLoadTaskInfo.INIT_TXN_ID == txnId) {
- rows.add(entity.getTaskShowInfo());
- return;
- }
- TransactionState transactionState =
Env.getCurrentGlobalTransactionMgr()
- .getTransactionState(dbId, entity.getTxnId());
- if (null != transactionState && null !=
transactionState.getTransactionStatus()) {
- entity.setTxnStatus(transactionState.getTransactionStatus());
- }
- rows.add(entity.getTaskShowInfo());
- });
- return rows;
}
public String getShowCreateInfo() {
@@ -1699,12 +1703,17 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
private String getTaskStatistic() {
Map<String, String> result = Maps.newHashMap();
- result.put("running_task",
- String.valueOf(routineLoadTaskInfoList.stream().filter(entity
-> entity.isRunning()).count()));
- result.put("waiting_task",
- String.valueOf(routineLoadTaskInfoList.stream().filter(entity
-> !entity.isRunning()).count()));
- Gson gson = new GsonBuilder().disableHtmlEscaping().create();
- return gson.toJson(result);
+ readLock();
+ try {
+ result.put("running_task",
+
String.valueOf(routineLoadTaskInfoList.stream().filter(entity ->
entity.isRunning()).count()));
+ result.put("waiting_task",
+
String.valueOf(routineLoadTaskInfoList.stream().filter(entity ->
!entity.isRunning()).count()));
+ Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+ return gson.toJson(result);
+ } finally {
+ readUnlock();
+ }
}
private String jobPropertiesToJsonString() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]