This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 8af666b1925 [branch-2.1](routine-load) enhance auto resume to keep 
routine load stable (#32689)
8af666b1925 is described below

commit 8af666b192560a6161f93898c883b0af65a3bd55
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Fri Mar 22 18:07:12 2024 +0800

    [branch-2.1](routine-load) enhance auto resume to keep routine load stable 
(#32689)
    
    * enhance auto resume to keep routine load stable
    
    * do not auto resume if job cannot resume definitely (#32419)
---
 .../src/main/java/org/apache/doris/common/InternalErrorCode.java    | 3 ++-
 .../main/java/org/apache/doris/load/routineload/RoutineLoadJob.java | 6 +++---
 .../main/java/org/apache/doris/load/routineload/ScheduleRule.java   | 5 ++++-
 .../src/main/java/org/apache/doris/service/FrontendServiceImpl.java | 2 +-
 4 files changed, 10 insertions(+), 6 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
index 2bbd5c58efa..b871fd198cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
@@ -34,7 +34,8 @@ public enum InternalErrorCode {
     MANUAL_STOP_ERR(101),
     TOO_MANY_FAILURE_ROWS_ERR(102),
     CREATE_TASKS_ERR(103),
-    TASKS_ABORT_ERR(104);
+    TASKS_ABORT_ERR(104),
+    CANNOT_RESUME_ERR(105);
 
     private long errCode;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 0f8341150fe..bb271a08d6d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1184,7 +1184,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
                                         + " with reason: " + 
txnStatusChangeReasonString
                                         + " please check the jsonpaths";
                                 updateState(JobState.PAUSED,
-                                        new 
ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
+                                        new 
ErrorReason(InternalErrorCode.CANNOT_RESUME_ERR, msg),
                                         false /* not replay */);
                                 return;
                             case OFFSET_OUT_OF_RANGE:
@@ -1197,14 +1197,14 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
                                         + " using the Alter ROUTINE LOAD 
command to modify it,"
                                         + " and resume the job";
                                 updateState(JobState.PAUSED,
-                                        new 
ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
+                                        new 
ErrorReason(InternalErrorCode.CANNOT_RESUME_ERR, msg),
                                         false /* not replay */);
                                 return;
                             case PAUSE:
                                 msg = "be " + taskBeId + " abort task "
                                         + "with reason: " + 
txnStatusChangeReasonString;
                                 updateState(JobState.PAUSED,
-                                        new 
ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
+                                        new 
ErrorReason(InternalErrorCode.CANNOT_RESUME_ERR, msg),
                                         false /* not replay */);
                                 return;
                             default:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
index e2eaf4d825d..2b4ef7b297a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
@@ -63,7 +63,10 @@ public class ScheduleRule {
                     jobRoutine.id, jobRoutine.firstResumeTimestamp, 
jobRoutine.autoResumeCount,
                     jobRoutine.pauseReason == null ? "null" : 
jobRoutine.pauseReason.getCode().name());
         }
-        if (jobRoutine.pauseReason != null && jobRoutine.pauseReason.getCode() 
== InternalErrorCode.REPLICA_FEW_ERR) {
+        if (jobRoutine.pauseReason != null
+                && jobRoutine.pauseReason.getCode() != 
InternalErrorCode.MANUAL_PAUSE_ERR
+                && jobRoutine.pauseReason.getCode() != 
InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR
+                && jobRoutine.pauseReason.getCode() != 
InternalErrorCode.CANNOT_RESUME_ERR) {
             int dead = deadBeCount();
             if (dead > Config.max_tolerable_backend_down_num) {
                 if (LOG.isDebugEnabled()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index a332f6f5031..8931d7b0859 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1979,7 +1979,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             try {
                 RoutineLoadJob routineLoadJob = 
Env.getCurrentEnv().getRoutineLoadManager()
                         
.getRoutineLoadJobByMultiLoadTaskTxnId(request.getTxnId());
-                routineLoadJob.updateState(JobState.PAUSED, new 
ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR,
+                routineLoadJob.updateState(JobState.PAUSED, new 
ErrorReason(InternalErrorCode.CANNOT_RESUME_ERR,
                             "failed to get stream load plan, " + 
exception.getMessage()), false);
             } catch (UserException e) {
                 LOG.warn("catch update routine load job error.", e);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to