This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/streaming-job-dev by this push:
new 4db2ec2cc23 [fix](streaming job) fix some streaming job bug (#56221)
4db2ec2cc23 is described below
commit 4db2ec2cc237a555c242676f22e5680ec680baa2
Author: hui lai <[email protected]>
AuthorDate: Fri Sep 19 12:55:00 2025 +0800
[fix](streaming job) fix some streaming job bug (#56221)
### What problem does this PR solve?
Fix some streaming job bug.
---
cloud/src/meta-service/meta_service_txn.cpp | 22 +++++++++++----
.../insert/streaming/StreamingInsertJob.java | 32 +++++++++++++++-------
.../streaming/StreamingJobSchedulerTask.java | 4 +--
.../job/scheduler/StreamingTaskScheduler.java | 23 ++++++++--------
gensrc/proto/cloud.proto | 4 +--
5 files changed, 54 insertions(+), 31 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index fd09f80d74c..22b2a449231 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -610,12 +610,14 @@ void put_routine_load_progress(MetaServiceCode& code,
std::string& msg,
<< " routine load new progress: " <<
new_progress_info.ShortDebugString();
}
-void put_streaming_job_meta(MetaServiceCode& code, std::string& msg, const
std::string& instance_id,
- const CommitTxnRequest* request, Transaction* txn,
int64_t db_id) {
+void update_streaming_job_meta(MetaServiceCode& code, std::string& msg,
+ const std::string& instance_id, const
CommitTxnRequest* request,
+ Transaction* txn, int64_t db_id) {
std::stringstream ss;
int64_t txn_id = request->txn_id();
if (!request->has_commit_attachment()) {
- ss << "failed to get commit attachment from req, db_id=" << db_id << "
txn_id=" << txn_id;
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ ss << "missing commit attachment, db_id=" << db_id << " txn_id=" <<
txn_id;
msg = ss.str();
return;
}
@@ -1706,7 +1708,12 @@ void MetaServiceImpl::commit_txn_immediately(
if (txn_info.load_job_source_type() ==
LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB) {
- put_streaming_job_meta(code, msg, instance_id, request, txn.get(),
db_id);
+ update_streaming_job_meta(code, msg, instance_id, request,
txn.get(), db_id);
+ if (code != MetaServiceCode::OK) {
+ LOG(WARNING) << "update_streaming_job_meta failed, txn_id=" <<
txn_id
+ << " code=" << code << " msg=" << msg;
+ return;
+ }
}
LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key)
@@ -2104,7 +2111,12 @@ void MetaServiceImpl::commit_txn_eventually(
if (txn_info.load_job_source_type() ==
LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB) {
- put_streaming_job_meta(code, msg, instance_id, request, txn.get(),
db_id);
+ update_streaming_job_meta(code, msg, instance_id, request,
txn.get(), db_id);
+ if (code != MetaServiceCode::OK) {
+ LOG(WARNING) << "update_streaming_job_meta failed, txn_id=" <<
txn_id
+ << " code=" << code << " msg=" << msg;
+ return;
+ }
}
// save versions for partition
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 70463894a66..7f882757356 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -147,6 +147,24 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
}
+ private void readLock() {
+ lock.readLock().lock();
+ }
+
+ private void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ private void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ private void writeUnlock() {
+ if (lock.writeLock().isHeldByCurrentThread()) {
+ lock.writeLock().unlock();
+ }
+ }
+
private UnboundTVFRelation getCurrentTvf() {
if (baseCommand == null) {
ConnectContext ctx =
InsertTask.makeConnectContext(getCreateUser(), getCurrentDbName());
@@ -229,11 +247,6 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
}
- // When consumer to EOF, delay schedule task appropriately can avoid too
many small transactions.
- public boolean needDelayScheduleTask() {
- return System.currentTimeMillis() - lastScheduleTaskTimestamp >
jobProperties.getMaxIntervalSecond() * 1000;
- }
-
public boolean hasMoreDataToConsume() {
return offsetProvider.hasMoreDataToConsume();
}
@@ -268,7 +281,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
this.failureReason = new FailureReason(task.getErrMsg());
}
} finally {
- lock.writeLock().unlock();
+ writeUnlock();
}
updateJobStatus(JobStatus.PAUSED);
}
@@ -282,7 +295,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
//todo: maybe fetch from txn attachment?
offsetProvider.updateOffset(task.getRunningOffset());
} finally {
- lock.writeLock().unlock();
+ writeUnlock();
}
}
@@ -392,7 +405,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@Override
public void beforeCommitted(TransactionState txnState) throws
TransactionException {
boolean shouldReleaseLock = false;
- lock.writeLock().lock();
+ writeLock();
try {
ArrayList<Long> taskIds = new ArrayList<>();
taskIds.add(runningStreamTask.getTaskId());
@@ -410,14 +423,13 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
runningStreamTask.getRunningOffset().toJson()));
} finally {
if (shouldReleaseLock) {
- lock.writeLock().unlock();
+ writeUnlock();
}
}
}
@Override
public void beforeAborted(TransactionState txnState) throws
TransactionException {
-
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index ec4eb0036d0..de46ccedf50 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -49,6 +49,7 @@ public class StreamingJobSchedulerTask extends AbstractTask {
case PENDING:
streamingInsertJob.createStreamingInsertTask();
streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
+ streamingInsertJob.setAutoResumeCount(0);
break;
case RUNNING:
streamingInsertJob.fetchMeta();
@@ -79,8 +80,7 @@ public class StreamingJobSchedulerTask extends AbstractTask {
if (autoResumeCount < Long.MAX_VALUE) {
streamingInsertJob.setAutoResumeCount(autoResumeCount + 1);
}
- streamingInsertJob.createStreamingInsertTask();
- streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
+ streamingInsertJob.updateJobStatus(JobStatus.PENDING);
return;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
index 6e664b404da..8cf06bbc1a4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -21,7 +21,6 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.common.util.MasterDaemon;
-import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask;
@@ -74,38 +73,38 @@ public class StreamingTaskScheduler extends MasterDaemon {
private void scheduleTasks(List<StreamingInsertTask> tasks) {
for (StreamingInsertTask task : tasks) {
threadPool.execute(() -> {
- try {
- scheduleOneTask(task);
- } catch (Exception e) {
- log.error("Failed to schedule task, task id: {}, job id:
{}", task.getTaskId(), task.getJobId(), e);
- }
+ scheduleOneTask(task);
});
}
}
- private void scheduleOneTask(StreamingInsertTask task) throws JobException
{
+ private void scheduleOneTask(StreamingInsertTask task) {
StreamingInsertJob job = (StreamingInsertJob)
Env.getCurrentEnv().getJobManager().getJob(task.getJobId());
if (job == null) {
log.warn("Job not found, job id: {}", task.getJobId());
return;
}
+
+ // reject invalid task
if (!job.needScheduleTask()) {
log.info("do not need to schedule invalid task, task id: {}, job
id: {}",
task.getTaskId(), task.getJobId());
return;
}
+ // reject task if no more data to consume
if (!job.hasMoreDataToConsume()) {
scheduleTaskWithDelay(task, 500);
return;
}
- if (job.getLastScheduleTaskTimestamp() != -1 &&
job.needDelayScheduleTask()) {
- scheduleTaskWithDelay(task, 500);
- return;
- }
log.info("prepare to schedule task, task id: {}, job id: {}",
task.getTaskId(), task.getJobId());
job.setLastScheduleTaskTimestamp(System.currentTimeMillis());
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().addRunningTask(task);
- task.execute();
+
+ try {
+ task.execute();
+ } catch (Exception e) {
+ log.error("Failed to execute task, task id: {}, job id: {}",
task.getTaskId(), task.getJobId(), e);
+ }
}
private void scheduleTaskWithDelay(StreamingInsertTask task, long delayMs)
{
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index c9382ed2980..6259d6d0da3 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1716,8 +1716,8 @@ enum MetaServiceCode {
JOB_ALREADY_SUCCESS = 5002;
ROUTINE_LOAD_DATA_INCONSISTENT = 5003;
ROUTINE_LOAD_PROGRESS_NOT_FOUND = 5004;
- STREAMING_JOB_PROGRESS_NOT_FOUND = 5005;
- JOB_CHECK_ALTER_VERSION = 5006;
+ JOB_CHECK_ALTER_VERSION = 5005;
+ STREAMING_JOB_PROGRESS_NOT_FOUND = 5006;
// Rate limit
MAX_QPS_LIMIT = 6001;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]