This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/streaming-job-dev by this push:
     new cbf74bf50b9 [fix](streaming job) fix register callback id invalid 
(#56142)
cbf74bf50b9 is described below

commit cbf74bf50b9173eac23bd712f6b15959e242fb5b
Author: hui lai <[email protected]>
AuthorDate: Wed Sep 17 15:16:34 2025 +0800

    [fix](streaming job) fix register callback id invalid (#56142)
    
    ### What problem does this PR solve?
    
    Fix register callback id invalid.
---
 .../plans/commands/insert/OlapInsertExecutor.java  | 27 ++++++++++++----------
 1 file changed, 15 insertions(+), 12 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index f41d8893f02..b7141dd9ed0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -99,26 +99,16 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
                 throw new BeginTransactionException("current running txns on 
db is larger than limit");
             }
             this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
-                    database.getId(), ImmutableList.of(table.getId()), 
labelName, null,
+                    database.getId(), ImmutableList.of(table.getId()), 
labelName,
                     new TxnCoordinator(TxnSourceType.FE, 0,
                             FrontendOptions.getLocalHostAddress(),
                             ExecuteEnv.getInstance().getStartupTime()),
-                    LoadJobSourceType.INSERT_STREAMING, getListenerId(), 
ctx.getExecTimeoutS());
+                    LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeoutS());
         } catch (Exception e) {
             throw new AnalysisException("begin transaction failed. " + 
e.getMessage(), e);
         }
     }
 
-    private long getListenerId() {
-        long listenerId = -1;
-        StreamingInsertTask streamingInsertTask =
-                
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().getStreamingInsertTaskById(jobId);
-        if (streamingInsertTask != null) {
-            listenerId = streamingInsertTask.getJobId();
-        }
-        return listenerId;
-    }
-
     @Override
     public void finalizeSink(PlanFragment fragment, DataSink sink, 
PhysicalSink physicalSink) {
         OlapTableSink olapTableSink = (OlapTableSink) sink;
@@ -199,6 +189,7 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
 
     @Override
     protected void onComplete() throws UserException {
+        setTxnCallbackId();
         if (ctx.getState().getStateType() == MysqlStateType.ERR) {
             try {
                 String errMsg = 
Strings.emptyToNull(ctx.getState().getErrorMessage());
@@ -234,6 +225,18 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
         }
     }
 
+    private void setTxnCallbackId() {
+        TransactionState state = 
Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), 
txnId);
+        if (state == null) {
+            throw new AnalysisException("txn does not exist: " + txnId);
+        }
+        StreamingInsertTask streamingInsertTask =
+                
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().getStreamingInsertTaskById(jobId);
+        if (streamingInsertTask != null) {
+            state.setCallbackId(streamingInsertTask.getJobId());
+        }
+    }
+
     @Override
     protected void onFail(Throwable t) {
         errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to