This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/streaming-job-dev by this push:
new cbf74bf50b9 [fix](streaming job) fix register callback id invalid
(#56142)
cbf74bf50b9 is described below
commit cbf74bf50b9173eac23bd712f6b15959e242fb5b
Author: hui lai <[email protected]>
AuthorDate: Wed Sep 17 15:16:34 2025 +0800
[fix](streaming job) fix register callback id invalid (#56142)
### What problem does this PR solve?
Fix register callback id invalid.
---
.../plans/commands/insert/OlapInsertExecutor.java | 27 ++++++++++++----------
1 file changed, 15 insertions(+), 12 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index f41d8893f02..b7141dd9ed0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -99,26 +99,16 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
throw new BeginTransactionException("current running txns on
db is larger than limit");
}
this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
- database.getId(), ImmutableList.of(table.getId()),
labelName, null,
+ database.getId(), ImmutableList.of(table.getId()),
labelName,
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
- LoadJobSourceType.INSERT_STREAMING, getListenerId(),
ctx.getExecTimeoutS());
+ LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeoutS());
} catch (Exception e) {
throw new AnalysisException("begin transaction failed. " +
e.getMessage(), e);
}
}
- private long getListenerId() {
- long listenerId = -1;
- StreamingInsertTask streamingInsertTask =
-
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().getStreamingInsertTaskById(jobId);
- if (streamingInsertTask != null) {
- listenerId = streamingInsertTask.getJobId();
- }
- return listenerId;
- }
-
@Override
public void finalizeSink(PlanFragment fragment, DataSink sink,
PhysicalSink physicalSink) {
OlapTableSink olapTableSink = (OlapTableSink) sink;
@@ -199,6 +189,7 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
@Override
protected void onComplete() throws UserException {
+ setTxnCallbackId();
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
try {
String errMsg =
Strings.emptyToNull(ctx.getState().getErrorMessage());
@@ -234,6 +225,18 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
}
}
+ private void setTxnCallbackId() {
+ TransactionState state =
Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(),
txnId);
+ if (state == null) {
+ throw new AnalysisException("txn does not exist: " + txnId);
+ }
+ StreamingInsertTask streamingInsertTask =
+
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().getStreamingInsertTaskById(jobId);
+ if (streamingInsertTask != null) {
+ state.setCallbackId(streamingInsertTask.getJobId());
+ }
+ }
+
@Override
protected void onFail(Throwable t) {
errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]