This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a301e313569 [fix](job) fix streaming job repeated load after FE master
change in cloud mode (#57205)
a301e313569 is described below
commit a301e313569921d9d208b430d8e1286fddd92245
Author: hui lai <[email protected]>
AuthorDate: Wed Oct 22 09:12:50 2025 +0800
[fix](job) fix streaming job repeated load after FE master change in cloud
mode (#57205)
### What problem does this PR solve?
Fix streaming job offset will be incorrect after FE master change
causing repeated load in cloud mode.
---
.../insert/streaming/StreamingInsertJob.java | 6 +++---
.../insert/streaming/StreamingJobSchedulerTask.java | 21 ++++++++++++++++++---
2 files changed, 21 insertions(+), 6 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 783de6348af..feb3254c546 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -601,7 +601,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
succeedTaskCount.incrementAndGet();
}
- public void replayOnCloudMode() throws UserException {
+ public void replayOnCloudMode() throws JobException {
Cloud.GetStreamingTaskCommitAttachRequest.Builder builder =
Cloud.GetStreamingTaskCommitAttachRequest.newBuilder();
builder.setCloudUniqueId(Config.cloud_unique_id);
@@ -617,12 +617,12 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
log.warn("not found streaming job progress, response: {}",
response);
return;
} else {
- throw new UserException(response.getStatus().getMsg());
+ throw new JobException(response.getStatus().getMsg());
}
}
} catch (RpcException e) {
log.info("failed to get streaming task commit attach {}", e);
- throw new UserException(e.getMessage());
+ throw new JobException(e.getMessage());
}
StreamingTaskTxnCommitAttachment commitAttach =
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index 0cd023b7d1d..503bc9e631f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -18,6 +18,7 @@
package org.apache.doris.job.extensions.insert.streaming;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.util.TimeUtils;
@@ -47,9 +48,7 @@ public class StreamingJobSchedulerTask extends AbstractTask {
public void run() throws JobException {
switch (streamingInsertJob.getJobStatus()) {
case PENDING:
- streamingInsertJob.createStreamingInsertTask();
- streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
- streamingInsertJob.setAutoResumeCount(0);
+ handlePendingState();
break;
case RUNNING:
streamingInsertJob.fetchMeta();
@@ -62,6 +61,22 @@ public class StreamingJobSchedulerTask extends AbstractTask {
}
}
+ private void handlePendingState() throws JobException {
+ if (Config.isCloudMode()) {
+ try {
+ streamingInsertJob.replayOnCloudMode();
+ } catch (JobException e) {
+ streamingInsertJob.setFailureReason(
+ new FailureReason(InternalErrorCode.INTERNAL_ERR,
e.getMessage()));
+ streamingInsertJob.updateJobStatus(JobStatus.PAUSED);
+ return;
+ }
+ }
+ streamingInsertJob.createStreamingInsertTask();
+ streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
+ streamingInsertJob.setAutoResumeCount(0);
+ }
+
private void autoResumeHandler() throws JobException {
final FailureReason failureReason =
streamingInsertJob.getFailureReason();
final long latestAutoResumeTimestamp =
streamingInsertJob.getLatestAutoResumeTimestamp();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]