This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/streaming-job-dev by this push:
     new 4db2ec2cc23 [fix](streaming job) fix some streaming job bug (#56221)
4db2ec2cc23 is described below

commit 4db2ec2cc237a555c242676f22e5680ec680baa2
Author: hui lai <[email protected]>
AuthorDate: Fri Sep 19 12:55:00 2025 +0800

    [fix](streaming job) fix some streaming job bug (#56221)
    
    ### What problem does this PR solve?
    
    Fix some streaming job bug.
---
 cloud/src/meta-service/meta_service_txn.cpp        | 22 +++++++++++----
 .../insert/streaming/StreamingInsertJob.java       | 32 +++++++++++++++-------
 .../streaming/StreamingJobSchedulerTask.java       |  4 +--
 .../job/scheduler/StreamingTaskScheduler.java      | 23 ++++++++--------
 gensrc/proto/cloud.proto                           |  4 +--
 5 files changed, 54 insertions(+), 31 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index fd09f80d74c..22b2a449231 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -610,12 +610,14 @@ void put_routine_load_progress(MetaServiceCode& code, 
std::string& msg,
               << " routine load new progress: " << 
new_progress_info.ShortDebugString();
 }
 
-void put_streaming_job_meta(MetaServiceCode& code, std::string& msg, const 
std::string& instance_id,
-                            const CommitTxnRequest* request, Transaction* txn, 
int64_t db_id) {
+void update_streaming_job_meta(MetaServiceCode& code, std::string& msg,
+                               const std::string& instance_id, const 
CommitTxnRequest* request,
+                               Transaction* txn, int64_t db_id) {
     std::stringstream ss;
     int64_t txn_id = request->txn_id();
     if (!request->has_commit_attachment()) {
-        ss << "failed to get commit attachment from req, db_id=" << db_id << " 
txn_id=" << txn_id;
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        ss << "missing commit attachment, db_id=" << db_id << " txn_id=" << 
txn_id;
         msg = ss.str();
         return;
     }
@@ -1706,7 +1708,12 @@ void MetaServiceImpl::commit_txn_immediately(
 
         if (txn_info.load_job_source_type() ==
             LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB) {
-            put_streaming_job_meta(code, msg, instance_id, request, txn.get(), 
db_id);
+            update_streaming_job_meta(code, msg, instance_id, request, 
txn.get(), db_id);
+            if (code != MetaServiceCode::OK) {
+                LOG(WARNING) << "update_streaming_job_meta failed, txn_id=" << 
txn_id
+                             << " code=" << code << " msg=" << msg;
+                return;
+            }
         }
 
         LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key)
@@ -2104,7 +2111,12 @@ void MetaServiceImpl::commit_txn_eventually(
 
         if (txn_info.load_job_source_type() ==
             LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB) {
-            put_streaming_job_meta(code, msg, instance_id, request, txn.get(), 
db_id);
+            update_streaming_job_meta(code, msg, instance_id, request, 
txn.get(), db_id);
+            if (code != MetaServiceCode::OK) {
+                LOG(WARNING) << "update_streaming_job_meta failed, txn_id=" << 
txn_id
+                             << " code=" << code << " msg=" << msg;
+                return;
+            }
         }
 
         // save versions for partition
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 70463894a66..7f882757356 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -147,6 +147,24 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         }
     }
 
+    private void readLock() {
+        lock.readLock().lock();
+    }
+
+    private void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        if (lock.writeLock().isHeldByCurrentThread()) {
+            lock.writeLock().unlock();
+        }
+    }
+
     private UnboundTVFRelation getCurrentTvf() {
         if (baseCommand == null) {
             ConnectContext ctx = 
InsertTask.makeConnectContext(getCreateUser(), getCurrentDbName());
@@ -229,11 +247,6 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         }
     }
 
-    // When consumer to EOF, delay schedule task appropriately can avoid too 
many small transactions.
-    public boolean needDelayScheduleTask() {
-        return System.currentTimeMillis() - lastScheduleTaskTimestamp > 
jobProperties.getMaxIntervalSecond() * 1000;
-    }
-
     public boolean hasMoreDataToConsume() {
         return offsetProvider.hasMoreDataToConsume();
     }
@@ -268,7 +281,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                 this.failureReason = new FailureReason(task.getErrMsg());
             }
         } finally {
-            lock.writeLock().unlock();
+            writeUnlock();
         }
         updateJobStatus(JobStatus.PAUSED);
     }
@@ -282,7 +295,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             //todo: maybe fetch from txn attachment?
             offsetProvider.updateOffset(task.getRunningOffset());
         } finally {
-            lock.writeLock().unlock();
+            writeUnlock();
         }
     }
 
@@ -392,7 +405,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     @Override
     public void beforeCommitted(TransactionState txnState) throws 
TransactionException {
         boolean shouldReleaseLock = false;
-        lock.writeLock().lock();
+        writeLock();
         try {
             ArrayList<Long> taskIds = new ArrayList<>();
             taskIds.add(runningStreamTask.getTaskId());
@@ -410,14 +423,13 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                         runningStreamTask.getRunningOffset().toJson()));
         } finally {
             if (shouldReleaseLock) {
-                lock.writeLock().unlock();
+                writeUnlock();
             }
         }
     }
 
     @Override
     public void beforeAborted(TransactionState txnState) throws 
TransactionException {
-
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index ec4eb0036d0..de46ccedf50 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -49,6 +49,7 @@ public class StreamingJobSchedulerTask extends AbstractTask {
             case PENDING:
                 streamingInsertJob.createStreamingInsertTask();
                 streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
+                streamingInsertJob.setAutoResumeCount(0);
                 break;
             case RUNNING:
                 streamingInsertJob.fetchMeta();
@@ -79,8 +80,7 @@ public class StreamingJobSchedulerTask extends AbstractTask {
                 if (autoResumeCount < Long.MAX_VALUE) {
                     streamingInsertJob.setAutoResumeCount(autoResumeCount + 1);
                 }
-                streamingInsertJob.createStreamingInsertTask();
-                streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
+                streamingInsertJob.updateJobStatus(JobStatus.PENDING);
                 return;
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
index 6e664b404da..8cf06bbc1a4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -21,7 +21,6 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.common.util.MasterDaemon;
-import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
 import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask;
 
@@ -74,38 +73,38 @@ public class StreamingTaskScheduler extends MasterDaemon {
     private void scheduleTasks(List<StreamingInsertTask> tasks) {
         for (StreamingInsertTask task : tasks) {
             threadPool.execute(() -> {
-                try {
-                    scheduleOneTask(task);
-                } catch (Exception e) {
-                    log.error("Failed to schedule task, task id: {}, job id: 
{}", task.getTaskId(), task.getJobId(), e);
-                }
+                scheduleOneTask(task);
             });
         }
     }
 
-    private void scheduleOneTask(StreamingInsertTask task) throws JobException 
{
+    private void scheduleOneTask(StreamingInsertTask task) {
         StreamingInsertJob job = (StreamingInsertJob) 
Env.getCurrentEnv().getJobManager().getJob(task.getJobId());
         if (job == null) {
             log.warn("Job not found, job id: {}", task.getJobId());
             return;
         }
+
+        // reject invalid task
         if (!job.needScheduleTask()) {
             log.info("do not need to schedule invalid task, task id: {}, job 
id: {}",
                         task.getTaskId(), task.getJobId());
             return;
         }
+        // reject task if no more data to consume
         if (!job.hasMoreDataToConsume()) {
             scheduleTaskWithDelay(task, 500);
             return;
         }
-        if (job.getLastScheduleTaskTimestamp() != -1 && 
job.needDelayScheduleTask()) {
-            scheduleTaskWithDelay(task, 500);
-            return;
-        }
         log.info("prepare to schedule task, task id: {}, job id: {}", 
task.getTaskId(), task.getJobId());
         job.setLastScheduleTaskTimestamp(System.currentTimeMillis());
         
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().addRunningTask(task);
-        task.execute();
+
+        try {
+            task.execute();
+        } catch (Exception e) {
+            log.error("Failed to execute task, task id: {}, job id: {}", 
task.getTaskId(), task.getJobId(), e);
+        }
     }
 
     private void scheduleTaskWithDelay(StreamingInsertTask task, long delayMs) 
{
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index c9382ed2980..6259d6d0da3 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1716,8 +1716,8 @@ enum MetaServiceCode {
     JOB_ALREADY_SUCCESS = 5002;
     ROUTINE_LOAD_DATA_INCONSISTENT = 5003;
     ROUTINE_LOAD_PROGRESS_NOT_FOUND = 5004;
-    STREAMING_JOB_PROGRESS_NOT_FOUND = 5005;
-    JOB_CHECK_ALTER_VERSION = 5006;
+    JOB_CHECK_ALTER_VERSION = 5005;
+    STREAMING_JOB_PROGRESS_NOT_FOUND = 5006;
 
     // Rate limit
     MAX_QPS_LIMIT = 6001;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to