This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4886ad8333d [fix](cloud) fix routine load can not consume in cloud
mode (#33248)
4886ad8333d is described below
commit 4886ad8333d3ab66d39dab5494fa1b8717ce28b2
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Thu Apr 4 19:57:31 2024 +0800
[fix](cloud) fix routine load can not consume in cloud mode (#33248)
Co-authored-by: yujun <[email protected]>
---
.../load/routineload/KafkaRoutineLoadJob.java | 3 ++-
.../doris/load/routineload/RoutineLoadJob.java | 22 ++++++++++++++++++++++
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 1067a759e5f..d2b9410568b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -302,7 +302,8 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
protected boolean checkCommitInfo(RLTaskTxnCommitAttachment
rlTaskTxnCommitAttachment,
TransactionState txnState,
TransactionState.TxnStatusChangeReason
txnStatusChangeReason) {
- if (txnState.getTransactionStatus() == TransactionStatus.COMMITTED) {
+ if (txnState.getTransactionStatus() == TransactionStatus.COMMITTED
+ || txnState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
// For committed txn, update the progress.
return true;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 20c1b999d03..1af051e183c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -993,8 +993,26 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
Preconditions.checkNotNull(planner);
Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
Table table = db.getTableOrMetaException(tableId,
Table.TableType.OLAP);
+ boolean needCleanCtx = false;
table.readLock();
try {
+ if (Config.isCloudMode()) {
+ String clusterName = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getClusterNameByClusterId(cloudClusterId);
+ if (Strings.isNullOrEmpty(clusterName)) {
+ String err = String.format("cluster name is empty, cluster
id is %s", cloudClusterId);
+ LOG.warn(err);
+ throw new UserException(err);
+ }
+ if (ConnectContext.get() == null) {
+ ConnectContext ctx = new ConnectContext();
+ ctx.setThreadLocalInfo();
+ ctx.setCloudCluster(clusterName);
+ needCleanCtx = true;
+ } else {
+ ConnectContext.get().setCloudCluster(clusterName);
+ }
+ }
TPipelineFragmentParams planParams =
planner.planForPipeline(loadId);
// add table indexes to transaction state
TransactionState txnState =
Env.getCurrentGlobalTransactionMgr().getTransactionState(db.getId(), txnId);
@@ -1008,6 +1026,9 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
return planParams;
} finally {
+ if (needCleanCtx) {
+ ConnectContext.remove();
+ }
table.readUnlock();
}
}
@@ -1081,6 +1102,7 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
@Override
public void afterCommitted(TransactionState txnState, boolean txnOperated)
throws UserException {
long taskBeId = -1L;
+ writeLock();
try {
if (txnOperated) {
// find task in job
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]