This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new d61c7deea44 branch-4.0: [fix](streaming job) fix streaming job 
statistic never update #56667 (#56766)
d61c7deea44 is described below

commit d61c7deea44b3a59114eb8bed7b3ac318d7cc712
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Oct 11 15:57:11 2025 +0800

    branch-4.0: [fix](streaming job) fix streaming job statistic never update 
#56667 (#56766)
    
    Cherry-picked from #56667
    
    Co-authored-by: hui lai <[email protected]>
---
 .../transaction/CloudGlobalTransactionMgr.java     | 11 +++++--
 .../apache/doris/cloud/transaction/TxnUtil.java    |  6 ++++
 .../insert/streaming/StreamingInsertJob.java       | 11 ++++---
 .../insert/streaming/StreamingInsertTask.java      |  2 +-
 .../StreamingTaskTxnCommitAttachment.java          |  6 ++++
 .../job/offset/s3/S3SourceOffsetProvider.java      |  6 ++--
 .../apache/doris/load/loadv2/InsertLoadJob.java    |  4 +++
 .../commands/insert/AbstractInsertExecutor.java    | 20 +++++++++++-
 .../commands/insert/InsertIntoTableCommand.java    |  8 +++--
 .../insert/OlapGroupCommitInsertExecutor.java      |  4 +--
 .../plans/commands/insert/OlapInsertExecutor.java  | 36 +++++++++++++++++-----
 .../commands/insert/OlapTxnInsertExecutor.java     |  4 +--
 .../streaming_job/test_streaming_insert_job.groovy | 20 ++++++------
 13 files changed, 105 insertions(+), 33 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index c3b9c321fa9..fa3617a96a5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -623,16 +623,18 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             } else if (txnCommitAttachment instanceof 
StreamingTaskTxnCommitAttachment) {
                 StreamingTaskTxnCommitAttachment 
streamingTaskTxnCommitAttachment =
                             (StreamingTaskTxnCommitAttachment) 
txnCommitAttachment;
-                TxnStateChangeCallback cb = 
callbackFactory.getCallback(streamingTaskTxnCommitAttachment.getTaskId());
+                TxnStateChangeCallback cb = 
callbackFactory.getCallback(streamingTaskTxnCommitAttachment.getJobId());
+                TxnCommitAttachment commitAttachment = null;
                 if (cb != null) {
                     // use a temporary transaction state to do before commit 
check,
                     // what actually works is the transactionId
                     TransactionState tmpTxnState = new TransactionState();
                     tmpTxnState.setTransactionId(transactionId);
                     cb.beforeCommitted(tmpTxnState);
+                    commitAttachment = tmpTxnState.getTxnCommitAttachment();
                 }
                 builder.setCommitAttachment(TxnUtil
-                        
.streamingTaskTxnCommitAttachmentToPb(streamingTaskTxnCommitAttachment));
+                        
.streamingTaskTxnCommitAttachmentToPb((StreamingTaskTxnCommitAttachment) 
commitAttachment));
             } else {
                 throw new UserException("invalid txnCommitAttachment");
             }
@@ -676,6 +678,11 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             if (txnCommitAttachment != null && txnCommitAttachment instanceof 
RLTaskTxnCommitAttachment) {
                 RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = 
(RLTaskTxnCommitAttachment) txnCommitAttachment;
                 callbackId = rlTaskTxnCommitAttachment.getJobId();
+            } else if (txnCommitAttachment != null
+                        && txnCommitAttachment instanceof 
StreamingTaskTxnCommitAttachment) {
+                StreamingTaskTxnCommitAttachment 
streamingTaskTxnCommitAttachment =
+                        (StreamingTaskTxnCommitAttachment) txnCommitAttachment;
+                callbackId = streamingTaskTxnCommitAttachment.getJobId();
             } else if (txnState != null) {
                 callbackId = txnState.getCallbackId();
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
index 06cd25d1357..b947ef0649e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
@@ -373,6 +373,12 @@ public class TxnUtil {
                 commitAttachment =
                     
TxnUtil.rtTaskTxnCommitAttachmentFromPb(txnInfo.getCommitAttachment());
             }
+
+            if (txnInfo.getCommitAttachment().getType()
+                    == 
TxnCommitAttachmentPB.Type.STREAMING_TASK_TXN_COMMIT_ATTACHMENT) {
+                commitAttachment =
+                    
TxnUtil.streamingTaskTxnCommitAttachmentFromPb(txnInfo.getCommitAttachment());
+            }
         }
 
         long prepareTime = txnInfo.hasPrepareTime() ? txnInfo.getPrepareTime() 
: -1;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index f70cbc37fca..0a9b27e7632 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -41,6 +41,7 @@ import org.apache.doris.job.extensions.insert.InsertJob;
 import org.apache.doris.job.extensions.insert.InsertTask;
 import org.apache.doris.job.offset.SourceOffsetProvider;
 import org.apache.doris.job.offset.SourceOffsetProviderFactory;
+import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.load.loadv2.LoadStatistic;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
@@ -295,8 +296,6 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
             StreamingInsertTask nextTask = createStreamingInsertTask();
             this.runningStreamTask = nextTask;
-            //todo: maybe fetch from txn attachment?
-            offsetProvider.updateOffset(task.getRunningOffset());
         } finally {
             writeUnlock();
         }
@@ -417,8 +416,12 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             taskIds.add(runningStreamTask.getTaskId());
             // todo: Check whether the taskid of runningtask is consistent 
with the taskid associated with txn
 
-            // todo: need get loadStatistic, load manager statistic is empty
-            LoadStatistic loadStatistic = new LoadStatistic();
+            List<LoadJob> loadJobs = 
Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds);
+            if (loadJobs.size() == 0) {
+                throw new TransactionException("load job not found, insert job 
id is " + runningStreamTask.getTaskId());
+            }
+            LoadJob loadJob = loadJobs.get(0);
+            LoadStatistic loadStatistic = loadJob.getLoadStatistic();
             txnState.setTxnCommitAttachment(new 
StreamingTaskTxnCommitAttachment(
                         getJobId(),
                         runningStreamTask.getTaskId(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index d1a72b8bd24..433e372abf5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -127,6 +127,7 @@ public class StreamingInsertTask {
 
         this.runningOffset = offsetProvider.getNextOffset(jobProperties, 
originTvfProps);
         InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) new 
NereidsParser().parseSingle(sql);
+        baseCommand.setJobId(getTaskId());
         StmtExecutor baseStmtExecutor =
                 new StmtExecutor(ctx, new LogicalPlanAdapter(baseCommand, 
ctx.getStatementContext()));
         baseCommand.initPlan(ctx, baseStmtExecutor, false);
@@ -135,7 +136,6 @@ public class StreamingInsertTask {
         }
         this.taskCommand = offsetProvider.rewriteTvfParams(baseCommand, 
runningOffset);
         this.taskCommand.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER 
+ getTaskId()));
-        this.taskCommand.setJobId(getTaskId());
         this.stmtExecutor = new StmtExecutor(ctx, new 
LogicalPlanAdapter(taskCommand, ctx.getStatementContext()));
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
index b3f6a2c0b1b..01fdb3e16e4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
@@ -23,9 +23,14 @@ import org.apache.doris.transaction.TxnCommitAttachment;
 
 import com.google.gson.annotations.SerializedName;
 import lombok.Getter;
+import lombok.Setter;
 
 public class StreamingTaskTxnCommitAttachment extends TxnCommitAttachment {
 
+    public StreamingTaskTxnCommitAttachment() {
+        super(TransactionState.LoadJobSourceType.STREAMING_JOB);
+    }
+
     public StreamingTaskTxnCommitAttachment(long jobId, long taskId,
                 long scannedRows, long loadBytes, long numFiles, long 
fileBytes, String offset) {
         super(TransactionState.LoadJobSourceType.STREAMING_JOB);
@@ -48,6 +53,7 @@ public class StreamingTaskTxnCommitAttachment extends 
TxnCommitAttachment {
     }
 
     @Getter
+    @Setter
     private long jobId;
     @Getter
     private long taskId;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index 74743d09399..01986b40c50 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -139,8 +139,10 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
             }
             return plan;
         });
-        return new InsertIntoTableCommand((LogicalPlan) rewritePlan, 
Optional.empty(), Optional.empty(),
-                Optional.empty(), true, Optional.empty());
+        InsertIntoTableCommand insertIntoTableCommand = new 
InsertIntoTableCommand((LogicalPlan) rewritePlan,
+                Optional.empty(), Optional.empty(), Optional.empty(), true, 
Optional.empty());
+        insertIntoTableCommand.setJobId(originCommand.getJobId());
+        return insertIntoTableCommand;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
index e52a87fe926..a8eec265b5b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
@@ -57,6 +57,10 @@ public class InsertLoadJob extends LoadJob {
         super(EtlJobType.INSERT, dbId, label);
     }
 
+    public InsertLoadJob(long dbId, String label, long jobId) {
+        super(EtlJobType.INSERT, dbId, label, jobId);
+    }
+
     public InsertLoadJob(String label, long transactionId, long dbId, long 
tableId,
             long createTimestamp, String failMsg, String trackingUrl, String 
firstErrorMsg,
             UserIdentity userInfo) throws MetaNotFoundException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index bc032a9d61f..9b6226b417a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -71,7 +71,7 @@ public abstract class AbstractInsertExecutor {
     protected long txnId = INVALID_TXN_ID;
 
     /**
-     * Constructor
+     * Randomly generate job ID.
      */
     public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String 
labelName, NereidsPlanner planner,
             Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
@@ -87,6 +87,24 @@ public abstract class AbstractInsertExecutor {
         this.emptyInsert = emptyInsert;
     }
 
+    /**
+     * Specify job ID.
+     */
+    public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String 
labelName, NereidsPlanner planner,
+            Optional<InsertCommandContext> insertCtx, boolean emptyInsert, 
long jobId) {
+        this.ctx = ctx;
+        this.database = table.getDatabase();
+        this.insertLoadJob = new InsertLoadJob(database.getId(), labelName, 
jobId);
+        ctx.getEnv().getLoadManager().addLoadJob(insertLoadJob);
+        this.coordinator = EnvFactory.getInstance().createCoordinator(
+                ctx, planner, ctx.getStatsErrorEstimator(), 
insertLoadJob.getId());
+        this.labelName = labelName;
+        this.table = table;
+        this.insertCtx = insertCtx;
+        this.emptyInsert = emptyInsert;
+        this.jobId = jobId;
+    }
+
     public Coordinator getCoordinator() {
         return coordinator;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index a0a43814486..6e6e499b089 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -132,6 +132,7 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
         this.cte = cte;
         this.needNormalizePlan = needNormalizePlan;
         this.branchName = branchName;
+        this.jobId = Env.getCurrentEnv().getNextId();
     }
 
     /**
@@ -356,7 +357,8 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
                             planner,
                             dataSink,
                             physicalSink,
-                            () -> new OlapTxnInsertExecutor(ctx, olapTable, 
label, planner, insertCtx, emptyInsert)
+                            () -> new OlapTxnInsertExecutor(
+                                    ctx, olapTable, label, planner, insertCtx, 
emptyInsert, jobId)
                     );
                 } else if (ctx.isGroupCommit()) {
                     Backend groupCommitBackend = Env.getCurrentEnv()
@@ -369,7 +371,7 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
                             dataSink,
                             physicalSink,
                             () -> new OlapGroupCommitInsertExecutor(
-                                    ctx, olapTable, label, planner, insertCtx, 
emptyInsert, groupCommitBackend
+                                    ctx, olapTable, label, planner, insertCtx, 
emptyInsert, groupCommitBackend, jobId
                             )
                     );
                 } else {
@@ -377,7 +379,7 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
                             planner,
                             dataSink,
                             physicalSink,
-                            () -> new OlapInsertExecutor(ctx, olapTable, 
label, planner, insertCtx, emptyInsert)
+                            () -> new OlapInsertExecutor(ctx, olapTable, 
label, planner, insertCtx, emptyInsert, jobId)
                     );
                 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
index a2c7e280da6..803b009d136 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
@@ -65,8 +65,8 @@ public class OlapGroupCommitInsertExecutor extends 
OlapInsertExecutor {
 
     public OlapGroupCommitInsertExecutor(ConnectContext ctx, Table table,
             String labelName, NereidsPlanner planner, 
Optional<InsertCommandContext> insertCtx,
-            boolean emptyInsert, Backend backend) {
-        super(ctx, table, labelName, planner, insertCtx, emptyInsert);
+            boolean emptyInsert, Backend backend, long jobId) {
+        super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId);
         this.groupCommitBackend = backend;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 357523a7b4b..e7d19f07577 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -32,6 +32,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask;
+import 
org.apache.doris.job.extensions.insert.streaming.StreamingTaskTxnCommitAttachment;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.exceptions.AnalysisException;
@@ -59,6 +60,7 @@ import 
org.apache.doris.transaction.TransactionState.LoadJobSourceType;
 import org.apache.doris.transaction.TransactionState.TxnCoordinator;
 import org.apache.doris.transaction.TransactionState.TxnSourceType;
 import org.apache.doris.transaction.TransactionStatus;
+import org.apache.doris.transaction.TxnCommitAttachment;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
@@ -84,8 +86,9 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
      * constructor
      */
     public OlapInsertExecutor(ConnectContext ctx, Table table,
-            String labelName, NereidsPlanner planner, 
Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
-        super(ctx, table, labelName, planner, insertCtx, emptyInsert);
+            String labelName, NereidsPlanner planner, 
Optional<InsertCommandContext> insertCtx, boolean emptyInsert,
+            long jobId) {
+        super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId);
         this.olapTable = (OlapTable) table;
     }
 
@@ -190,6 +193,7 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
 
     @Override
     protected void onComplete() throws UserException {
+        TxnCommitAttachment txnCommitAttachment = buildTxnAttachment();
         setTxnCallbackId();
         if (ctx.getState().getStateType() == MysqlStateType.ERR) {
             try {
@@ -204,7 +208,7 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
                 database, Lists.newArrayList((Table) table),
                 txnId,
                 TabletCommitInfo.fromThrift(coordinator.getCommitInfos()),
-                ctx.getSessionVariable().getInsertVisibleTimeoutMs())) {
+                ctx.getSessionVariable().getInsertVisibleTimeoutMs(), 
txnCommitAttachment)) {
             txnStatus = TransactionStatus.VISIBLE;
         } else {
             txnStatus = TransactionStatus.COMMITTED;
@@ -231,13 +235,31 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
         if (state == null) {
             throw new AnalysisException("txn does not exist: " + txnId);
         }
-        StreamingInsertTask streamingInsertTask =
-                
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().getStreamingInsertTaskById(jobId);
-        if (streamingInsertTask != null) {
-            state.setCallbackId(streamingInsertTask.getJobId());
+        StreamingInsertTask task = Env.getCurrentEnv()
+                                      .getJobManager()
+                                      .getStreamingTaskManager()
+                                      .getStreamingInsertTaskById(jobId);
+        if (task != null) {
+            state.setCallbackId(task.getJobId());
         }
     }
 
+    private TxnCommitAttachment buildTxnAttachment() {
+        if (!Config.isCloudMode()) {
+            return null;
+        }
+        StreamingInsertTask task = Env.getCurrentEnv()
+                                      .getJobManager()
+                                      .getStreamingTaskManager()
+                                      .getStreamingInsertTaskById(jobId);
+        if (task == null) {
+            return null;
+        }
+        StreamingTaskTxnCommitAttachment attachment = new 
StreamingTaskTxnCommitAttachment();
+        attachment.setJobId(task.getJobId());
+        return attachment;
+    }
+
     @Override
     protected void onFail(Throwable t) {
         errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapTxnInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapTxnInsertExecutor.java
index 1512eca16e2..aa095d3f91a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapTxnInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapTxnInsertExecutor.java
@@ -42,8 +42,8 @@ public class OlapTxnInsertExecutor extends OlapInsertExecutor 
{
 
     public OlapTxnInsertExecutor(ConnectContext ctx, Table table,
             String labelName, NereidsPlanner planner, 
Optional<InsertCommandContext> insertCtx,
-            boolean emptyInsert) {
-        super(ctx, table, labelName, planner, insertCtx, emptyInsert);
+            boolean emptyInsert, long jobId) {
+        super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId);
         txnStatus = TransactionStatus.PREPARE;
     }
 
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index 6865cedd842..5695fa31c92 100644
--- 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -65,8 +65,8 @@ suite("test_streaming_insert_job") {
         Awaitility.await().atMost(300, SECONDS)
                 .pollInterval(1, SECONDS).until(
                 {
-                    print("check success task count")
                     def jobSuccendCount = sql """ select SucceedTaskCount from 
jobs("type"="insert") where Name like '%${jobName}%' and 
ExecuteType='STREAMING' """
+                    log.info("jobSuccendCount: " + jobSuccendCount)
                     // check job status and succeed task count larger than 2
                     jobSuccendCount.size() == 1 && '2' <= 
jobSuccendCount.get(0).get(0)
                 }
@@ -74,13 +74,13 @@ suite("test_streaming_insert_job") {
     } catch (Exception ex){
         def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
         def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
-        println("show job: " + showjob)
-        println("show task: " + showtask)
+        log.info("show job: " + showjob)
+        log.info("show task: " + showtask)
         throw ex;
     }
 
     def jobResult = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
-    println("show success job: " + jobResult)
+    log.info("show success job: " + jobResult)
 
     qt_select """ SELECT * FROM ${tableName} order by c1 """
 
@@ -96,12 +96,14 @@ suite("test_streaming_insert_job") {
     assert pauseShowTask.size() == 0
 
 
-    def jobOffset = sql """
-        select currentOffset, endoffset from jobs("type"="insert") where 
Name='${jobName}'
+    def jobInfo = sql """
+        select currentOffset, endoffset, loadStatistic from 
jobs("type"="insert") where Name='${jobName}'
     """
-    assert jobOffset.get(0).get(0) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
-    assert jobOffset.get(0).get(1) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
-
+    log.info("jobInfo: " + jobInfo)
+    assert jobInfo.get(0).get(0) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+    assert jobInfo.get(0).get(1) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+    assert jobInfo.get(0).get(2) == 
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}"
+    
     // alter streaming job
     sql """
        ALTER JOB ${jobName}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to