This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a301e313569 [fix](job) fix streaming job repeated load after FE master 
change in cloud mode (#57205)
a301e313569 is described below

commit a301e313569921d9d208b430d8e1286fddd92245
Author: hui lai <[email protected]>
AuthorDate: Wed Oct 22 09:12:50 2025 +0800

    [fix](job) fix streaming job repeated load after FE master change in cloud 
mode (#57205)
    
    ### What problem does this PR solve?
    
    Fix streaming job offset will be incorrect after FE master change
    causing repeated load in cloud mode.
---
 .../insert/streaming/StreamingInsertJob.java        |  6 +++---
 .../insert/streaming/StreamingJobSchedulerTask.java | 21 ++++++++++++++++++---
 2 files changed, 21 insertions(+), 6 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 783de6348af..feb3254c546 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -601,7 +601,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         succeedTaskCount.incrementAndGet();
     }
 
-    public void replayOnCloudMode() throws UserException {
+    public void replayOnCloudMode() throws JobException {
         Cloud.GetStreamingTaskCommitAttachRequest.Builder builder =
                 Cloud.GetStreamingTaskCommitAttachRequest.newBuilder();
         builder.setCloudUniqueId(Config.cloud_unique_id);
@@ -617,12 +617,12 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                     log.warn("not found streaming job progress, response: {}", 
response);
                     return;
                 } else {
-                    throw new UserException(response.getStatus().getMsg());
+                    throw new JobException(response.getStatus().getMsg());
                 }
             }
         } catch (RpcException e) {
             log.info("failed to get streaming task commit attach {}", e);
-            throw new UserException(e.getMessage());
+            throw new JobException(e.getMessage());
         }
 
         StreamingTaskTxnCommitAttachment commitAttach =
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index 0cd023b7d1d..503bc9e631f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -18,6 +18,7 @@
 package org.apache.doris.job.extensions.insert.streaming;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.util.TimeUtils;
@@ -47,9 +48,7 @@ public class StreamingJobSchedulerTask extends AbstractTask {
     public void run() throws JobException {
         switch (streamingInsertJob.getJobStatus()) {
             case PENDING:
-                streamingInsertJob.createStreamingInsertTask();
-                streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
-                streamingInsertJob.setAutoResumeCount(0);
+                handlePendingState();
                 break;
             case RUNNING:
                 streamingInsertJob.fetchMeta();
@@ -62,6 +61,22 @@ public class StreamingJobSchedulerTask extends AbstractTask {
         }
     }
 
+    private void handlePendingState() throws JobException {
+        if (Config.isCloudMode()) {
+            try {
+                streamingInsertJob.replayOnCloudMode();
+            } catch (JobException e) {
+                streamingInsertJob.setFailureReason(
+                    new FailureReason(InternalErrorCode.INTERNAL_ERR, 
e.getMessage()));
+                streamingInsertJob.updateJobStatus(JobStatus.PAUSED);
+                return;
+            }
+        }
+        streamingInsertJob.createStreamingInsertTask();
+        streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
+        streamingInsertJob.setAutoResumeCount(0);
+    }
+
     private void autoResumeHandler() throws JobException {
         final FailureReason failureReason = 
streamingInsertJob.getFailureReason();
         final long latestAutoResumeTimestamp = 
streamingInsertJob.getLatestAutoResumeTimestamp();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to