This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/streaming-job-dev by this push:
     new 406276e268c [fix](streaming job) add task commit check and job event 
lock to ensure exactly-once semantics (#56135)
406276e268c is described below

commit 406276e268cbf529bf408331664fa42511a7c3ec
Author: hui lai <[email protected]>
AuthorDate: Wed Sep 17 13:54:25 2025 +0800

    [fix](streaming job) add task commit check and job event lock to ensure 
exactly-once semantics (#56135)
    
    ### What problem does this PR solve?
    
    Add task commit check and job event lock to ensure exactly-once
    semantics.
---
 .../insert/streaming/StreamingInsertJob.java       | 86 +++++++++++++++-------
 .../insert/streaming/StreamingInsertTask.java      |  6 +-
 2 files changed, 63 insertions(+), 29 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 387714e8805..650b9f412b2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -73,6 +73,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 @Log4j2
 public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, 
Map<Object, Object>> implements
@@ -102,6 +103,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     @Getter
     private long lastScheduleTaskTimestamp = -1L;
     private InsertIntoTableCommand baseCommand;
+    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
 
     public StreamingInsertJob(String jobName,
             JobStatus jobStatus,
@@ -156,9 +158,14 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
     @Override
     public void updateJobStatus(JobStatus status) throws JobException {
-        super.updateJobStatus(status);
-        if (isFinalStatus()) {
-            
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getJobId());
+        lock.writeLock().lock();
+        try {
+            super.updateJobStatus(status);
+            if (isFinalStatus()) {
+                
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getJobId());
+            }
+        } finally {
+            lock.writeLock().unlock();
         }
     }
 
@@ -230,21 +237,29 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     }
 
     public void onStreamTaskFail(StreamingInsertTask task) throws JobException 
{
-        failedTaskCount.incrementAndGet();
-        
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
-        if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
-            this.pauseReason = new PauseReason(InternalErrorCode.INTERNAL_ERR, 
task.getErrMsg());
+        try {
+          failedTaskCount.incrementAndGet();
+          
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
+          if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
+              this.pauseReason = new 
PauseReason(InternalErrorCode.INTERNAL_ERR, task.getErrMsg());
+          }
+        } finally {
+            lock.writeLock().unlock();
         }
         updateJobStatus(JobStatus.PAUSED);
     }
 
     public void onStreamTaskSuccess(StreamingInsertTask task) {
-        succeedTaskCount.incrementAndGet();
-        
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
-        StreamingInsertTask nextTask = createStreamingInsertTask();
-        this.runningStreamTask = nextTask;
-        //todo: maybe fetch from txn attachment?
-        offsetProvider.updateOffset(task.getRunningOffset());
+        try {
+            succeedTaskCount.incrementAndGet();
+            
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
+            StreamingInsertTask nextTask = createStreamingInsertTask();
+            this.runningStreamTask = nextTask;
+            //todo: maybe fetch from txn attachment?
+            offsetProvider.updateOffset(task.getRunningOffset());
+        } finally {
+            lock.writeLock().unlock();
+        }
     }
 
     private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment 
attachment) {
@@ -337,22 +352,37 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
     @Override
     public void beforeCommitted(TransactionState txnState) throws 
TransactionException {
-        ArrayList<Long> taskIds = new ArrayList<>();
-        taskIds.add(runningStreamTask.getTaskId());
-        List<LoadJob> loadJobs = 
Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds);
-        if (loadJobs.size() != 1) {
-            throw new TransactionException("load job not found, insert job id 
is " + runningStreamTask.getTaskId());
+        boolean shouldRealseLock = false;
+        lock.writeLock().lock();
+        try {
+            ArrayList<Long> taskIds = new ArrayList<>();
+            taskIds.add(runningStreamTask.getTaskId());
+            List<LoadJob> loadJobs = 
Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds);
+            if (loadJobs.size() != 1) {
+                throw new TransactionException("load job not found, insert job 
id is " + runningStreamTask.getTaskId());
+            }
+            LoadJob loadJob = loadJobs.get(0);
+
+            if (txnState.getTransactionId() != loadJob.getTransactionId()
+                    || 
!runningStreamTask.getStatus().equals(TaskStatus.RUNNING)) {
+                shouldRealseLock = true;
+                throw new TransactionException("txn " + 
txnState.getTransactionId() + "should be aborted.");
+            }
+
+            LoadStatistic loadStatistic = loadJob.getLoadStatistic();
+            txnState.setTxnCommitAttachment(new 
StreamingTaskTxnCommitAttachment(
+                        getJobId(),
+                        runningStreamTask.getTaskId(),
+                        loadStatistic.getScannedRows(),
+                        loadStatistic.getLoadBytes(),
+                        loadStatistic.getFileNumber(),
+                        loadStatistic.getTotalFileSizeB(),
+                        runningStreamTask.getRunningOffset()));
+        } finally {
+            if (shouldRealseLock) {
+                lock.writeLock().unlock();
+            }
         }
-        LoadJob loadJob = loadJobs.get(0);
-        LoadStatistic loadStatistic = loadJob.getLoadStatistic();
-        txnState.setTxnCommitAttachment(new StreamingTaskTxnCommitAttachment(
-                    getJobId(),
-                    runningStreamTask.getTaskId(),
-                    loadStatistic.getScannedRows(),
-                    loadStatistic.getLoadBytes(),
-                    loadStatistic.getFileNumber(),
-                    loadStatistic.getTotalFileSizeB(),
-                    runningStreamTask.getRunningOffset()));
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index b1e22a0a7cb..c171173fd3c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -206,7 +206,11 @@ public class StreamingInsertTask {
         streamingInsertJob.onStreamTaskFail(this);
     }
 
-    public void cancel(boolean needWaitCancelComplete) throws Exception {
+    public void cancel(boolean needWaitCancelComplete) {
+        if (TaskStatus.SUCCESS.equals(status) || 
TaskStatus.FAILED.equals(status)
+                || TaskStatus.CANCELED.equals(status)) {
+            return;
+        }
         if (isCanceled.get()) {
             return;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to