This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4886ad8333d [fix](cloud) fix routine load can not consume in cloud 
mode (#33248)
4886ad8333d is described below

commit 4886ad8333d3ab66d39dab5494fa1b8717ce28b2
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Thu Apr 4 19:57:31 2024 +0800

    [fix](cloud) fix routine load can not consume in cloud mode (#33248)
    
    Co-authored-by: yujun <[email protected]>
---
 .../load/routineload/KafkaRoutineLoadJob.java      |  3 ++-
 .../doris/load/routineload/RoutineLoadJob.java     | 22 ++++++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 1067a759e5f..d2b9410568b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -302,7 +302,8 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     protected boolean checkCommitInfo(RLTaskTxnCommitAttachment 
rlTaskTxnCommitAttachment,
                                       TransactionState txnState,
                                       TransactionState.TxnStatusChangeReason 
txnStatusChangeReason) {
-        if (txnState.getTransactionStatus() == TransactionStatus.COMMITTED) {
+        if (txnState.getTransactionStatus() == TransactionStatus.COMMITTED
+                    || txnState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
             // For committed txn, update the progress.
             return true;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 20c1b999d03..1af051e183c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -993,8 +993,26 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         Preconditions.checkNotNull(planner);
         Database db = 
Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
         Table table = db.getTableOrMetaException(tableId, 
Table.TableType.OLAP);
+        boolean needCleanCtx = false;
         table.readLock();
         try {
+            if (Config.isCloudMode()) {
+                String clusterName = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
+                        .getClusterNameByClusterId(cloudClusterId);
+                if (Strings.isNullOrEmpty(clusterName)) {
+                    String err = String.format("cluster name is empty, cluster 
id is %s", cloudClusterId);
+                    LOG.warn(err);
+                    throw new UserException(err);
+                }
+                if (ConnectContext.get() == null) {
+                    ConnectContext ctx = new ConnectContext();
+                    ctx.setThreadLocalInfo();
+                    ctx.setCloudCluster(clusterName);
+                    needCleanCtx = true;
+                } else {
+                    ConnectContext.get().setCloudCluster(clusterName);
+                }
+            }
             TPipelineFragmentParams planParams = 
planner.planForPipeline(loadId);
             // add table indexes to transaction state
             TransactionState txnState = 
Env.getCurrentGlobalTransactionMgr().getTransactionState(db.getId(), txnId);
@@ -1008,6 +1026,9 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
 
             return planParams;
         } finally {
+            if (needCleanCtx) {
+                ConnectContext.remove();
+            }
             table.readUnlock();
         }
     }
@@ -1081,6 +1102,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
     @Override
     public void afterCommitted(TransactionState txnState, boolean txnOperated) 
throws UserException {
         long taskBeId = -1L;
+        writeLock();
         try {
             if (txnOperated) {
                 // find task in job


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to