This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new a1aa9b8ab9f [fix](routine load) add read lock to fix some concurrent 
bugs (#39242) (#39525)
a1aa9b8ab9f is described below

commit a1aa9b8ab9f242dc81784d7e15972085d53d2ce4
Author: hui lai <[email protected]>
AuthorDate: Mon Aug 19 21:18:27 2024 +0800

    [fix](routine load) add read lock to fix some concurrent bugs (#39242) 
(#39525)
    
    pick #39242
---
 .../doris/load/routineload/RoutineLoadJob.java     | 53 +++++++++++++---------
 1 file changed, 31 insertions(+), 22 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 5ef531bb37f..d3bd24a0b3a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1566,24 +1566,28 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
 
     public List<List<String>> getTasksShowInfo() throws AnalysisException {
         List<List<String>> rows = Lists.newArrayList();
-        if (null == routineLoadTaskInfoList || 
routineLoadTaskInfoList.isEmpty()) {
+        readLock();
+        try {
+            if (null == routineLoadTaskInfoList || 
routineLoadTaskInfoList.isEmpty()) {
+                return rows;
+            }
+            routineLoadTaskInfoList.forEach(entity -> {
+                long txnId = entity.getTxnId();
+                if (RoutineLoadTaskInfo.INIT_TXN_ID == txnId) {
+                    rows.add(entity.getTaskShowInfo());
+                    return;
+                }
+                TransactionState transactionState = 
Env.getCurrentGlobalTransactionMgr()
+                        .getTransactionState(dbId, entity.getTxnId());
+                if (null != transactionState && null != 
transactionState.getTransactionStatus()) {
+                    
entity.setTxnStatus(transactionState.getTransactionStatus());
+                }
+                rows.add(entity.getTaskShowInfo());
+            });
             return rows;
+        } finally {
+            readUnlock();
         }
-
-        routineLoadTaskInfoList.forEach(entity -> {
-            long txnId = entity.getTxnId();
-            if (RoutineLoadTaskInfo.INIT_TXN_ID == txnId) {
-                rows.add(entity.getTaskShowInfo());
-                return;
-            }
-            TransactionState transactionState = 
Env.getCurrentGlobalTransactionMgr()
-                    .getTransactionState(dbId, entity.getTxnId());
-            if (null != transactionState && null != 
transactionState.getTransactionStatus()) {
-                entity.setTxnStatus(transactionState.getTransactionStatus());
-            }
-            rows.add(entity.getTaskShowInfo());
-        });
-        return rows;
     }
 
     public String getShowCreateInfo() {
@@ -1699,12 +1703,17 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
 
     private String getTaskStatistic() {
         Map<String, String> result = Maps.newHashMap();
-        result.put("running_task",
-                String.valueOf(routineLoadTaskInfoList.stream().filter(entity 
-> entity.isRunning()).count()));
-        result.put("waiting_task",
-                String.valueOf(routineLoadTaskInfoList.stream().filter(entity 
-> !entity.isRunning()).count()));
-        Gson gson = new GsonBuilder().disableHtmlEscaping().create();
-        return gson.toJson(result);
+        readLock();
+        try {
+            result.put("running_task",
+                    
String.valueOf(routineLoadTaskInfoList.stream().filter(entity -> 
entity.isRunning()).count()));
+            result.put("waiting_task",
+                    
String.valueOf(routineLoadTaskInfoList.stream().filter(entity -> 
!entity.isRunning()).count()));
+            Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+            return gson.toJson(result);
+        } finally {
+            readUnlock();
+        }
     }
 
     private String jobPropertiesToJsonString() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to