This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/streaming-job-dev by this push:
new 406276e268c [fix](streaming job) add task commit check and job event
lock to ensure exactly-once semantics (#56135)
406276e268c is described below
commit 406276e268cbf529bf408331664fa42511a7c3ec
Author: hui lai <[email protected]>
AuthorDate: Wed Sep 17 13:54:25 2025 +0800
[fix](streaming job) add task commit check and job event lock to ensure
exactly-once semantics (#56135)
### What problem does this PR solve?
Add task commit check and job event lock to ensure exactly-once
semantics.
---
.../insert/streaming/StreamingInsertJob.java | 86 +++++++++++++++-------
.../insert/streaming/StreamingInsertTask.java | 6 +-
2 files changed, 63 insertions(+), 29 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 387714e8805..650b9f412b2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -73,6 +73,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
@Log4j2
public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask,
Map<Object, Object>> implements
@@ -102,6 +103,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@Getter
private long lastScheduleTaskTimestamp = -1L;
private InsertIntoTableCommand baseCommand;
+ private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
public StreamingInsertJob(String jobName,
JobStatus jobStatus,
@@ -156,9 +158,14 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@Override
public void updateJobStatus(JobStatus status) throws JobException {
- super.updateJobStatus(status);
- if (isFinalStatus()) {
-
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getJobId());
+ lock.writeLock().lock();
+ try {
+ super.updateJobStatus(status);
+ if (isFinalStatus()) {
+
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getJobId());
+ }
+ } finally {
+ lock.writeLock().unlock();
}
}
@@ -230,21 +237,29 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
public void onStreamTaskFail(StreamingInsertTask task) throws JobException
{
- failedTaskCount.incrementAndGet();
-
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
- if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
- this.pauseReason = new PauseReason(InternalErrorCode.INTERNAL_ERR,
task.getErrMsg());
+ try {
+ failedTaskCount.incrementAndGet();
+
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
+ if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
+ this.pauseReason = new
PauseReason(InternalErrorCode.INTERNAL_ERR, task.getErrMsg());
+ }
+ } finally {
+ lock.writeLock().unlock();
}
updateJobStatus(JobStatus.PAUSED);
}
public void onStreamTaskSuccess(StreamingInsertTask task) {
- succeedTaskCount.incrementAndGet();
-
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
- StreamingInsertTask nextTask = createStreamingInsertTask();
- this.runningStreamTask = nextTask;
- //todo: maybe fetch from txn attachment?
- offsetProvider.updateOffset(task.getRunningOffset());
+ try {
+ succeedTaskCount.incrementAndGet();
+
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
+ StreamingInsertTask nextTask = createStreamingInsertTask();
+ this.runningStreamTask = nextTask;
+ //todo: maybe fetch from txn attachment?
+ offsetProvider.updateOffset(task.getRunningOffset());
+ } finally {
+ lock.writeLock().unlock();
+ }
}
private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment
attachment) {
@@ -337,22 +352,37 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@Override
public void beforeCommitted(TransactionState txnState) throws
TransactionException {
- ArrayList<Long> taskIds = new ArrayList<>();
- taskIds.add(runningStreamTask.getTaskId());
- List<LoadJob> loadJobs =
Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds);
- if (loadJobs.size() != 1) {
- throw new TransactionException("load job not found, insert job id
is " + runningStreamTask.getTaskId());
+ boolean shouldRealseLock = false;
+ lock.writeLock().lock();
+ try {
+ ArrayList<Long> taskIds = new ArrayList<>();
+ taskIds.add(runningStreamTask.getTaskId());
+ List<LoadJob> loadJobs =
Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds);
+ if (loadJobs.size() != 1) {
+ throw new TransactionException("load job not found, insert job
id is " + runningStreamTask.getTaskId());
+ }
+ LoadJob loadJob = loadJobs.get(0);
+
+ if (txnState.getTransactionId() != loadJob.getTransactionId()
+ ||
!runningStreamTask.getStatus().equals(TaskStatus.RUNNING)) {
+ shouldRealseLock = true;
+ throw new TransactionException("txn " +
txnState.getTransactionId() + "should be aborted.");
+ }
+
+ LoadStatistic loadStatistic = loadJob.getLoadStatistic();
+ txnState.setTxnCommitAttachment(new
StreamingTaskTxnCommitAttachment(
+ getJobId(),
+ runningStreamTask.getTaskId(),
+ loadStatistic.getScannedRows(),
+ loadStatistic.getLoadBytes(),
+ loadStatistic.getFileNumber(),
+ loadStatistic.getTotalFileSizeB(),
+ runningStreamTask.getRunningOffset()));
+ } finally {
+ if (shouldRealseLock) {
+ lock.writeLock().unlock();
+ }
}
- LoadJob loadJob = loadJobs.get(0);
- LoadStatistic loadStatistic = loadJob.getLoadStatistic();
- txnState.setTxnCommitAttachment(new StreamingTaskTxnCommitAttachment(
- getJobId(),
- runningStreamTask.getTaskId(),
- loadStatistic.getScannedRows(),
- loadStatistic.getLoadBytes(),
- loadStatistic.getFileNumber(),
- loadStatistic.getTotalFileSizeB(),
- runningStreamTask.getRunningOffset()));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index b1e22a0a7cb..c171173fd3c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -206,7 +206,11 @@ public class StreamingInsertTask {
streamingInsertJob.onStreamTaskFail(this);
}
- public void cancel(boolean needWaitCancelComplete) throws Exception {
+ public void cancel(boolean needWaitCancelComplete) {
+ if (TaskStatus.SUCCESS.equals(status) ||
TaskStatus.FAILED.equals(status)
+ || TaskStatus.CANCELED.equals(status)) {
+ return;
+ }
if (isCanceled.get()) {
return;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]