This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new a4eb078cbaf [branch-2.0](routine-load) enhance auto resume to keep
routine load stable (#32590)
a4eb078cbaf is described below
commit a4eb078cbaf62c184a858009613f6dc99edbc2ea
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Thu Mar 21 14:35:57 2024 +0800
[branch-2.0](routine-load) enhance auto resume to keep routine load stable
(#32590)
---
.../java/org/apache/doris/common/InternalErrorCode.java | 3 ++-
.../org/apache/doris/load/routineload/RoutineLoadJob.java | 6 +++---
.../org/apache/doris/load/routineload/ScheduleRule.java | 15 ++++++++++-----
3 files changed, 15 insertions(+), 9 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
index 2bbd5c58efa..b871fd198cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
@@ -34,7 +34,8 @@ public enum InternalErrorCode {
MANUAL_STOP_ERR(101),
TOO_MANY_FAILURE_ROWS_ERR(102),
CREATE_TASKS_ERR(103),
- TASKS_ABORT_ERR(104);
+ TASKS_ABORT_ERR(104),
+ CANNOT_RESUME_ERR(105);
private long errCode;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 0d9ae516351..a55f0a02124 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1133,7 +1133,7 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
+ " with reason: " +
txnStatusChangeReasonString
+ " please check the jsonpaths";
updateState(JobState.PAUSED,
- new
ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
+ new
ErrorReason(InternalErrorCode.CANNOT_RESUME_ERR, msg),
false /* not replay */);
return;
case OFFSET_OUT_OF_RANGE:
@@ -1146,14 +1146,14 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
+ " using the Alter ROUTINE LOAD
command to modify it,"
+ " and resume the job";
updateState(JobState.PAUSED,
- new
ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
+ new
ErrorReason(InternalErrorCode.CANNOT_RESUME_ERR, msg),
false /* not replay */);
return;
case PAUSE:
msg = "be " + taskBeId + " abort task "
+ "with reason: " +
txnStatusChangeReasonString;
updateState(JobState.PAUSED,
- new
ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
+ new
ErrorReason(InternalErrorCode.CANNOT_RESUME_ERR, msg),
false /* not replay */);
return;
default:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
index a0aab5c1cb9..052f22bf3de 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
@@ -55,11 +55,16 @@ public class ScheduleRule {
/*
* Handle all backends are down.
*/
- LOG.debug("try to auto reschedule routine load {},
firstResumeTimestamp: {}, autoResumeCount: {}, "
- + "pause reason: {}",
- jobRoutine.id, jobRoutine.firstResumeTimestamp,
jobRoutine.autoResumeCount,
- jobRoutine.pauseReason == null ? "null" :
jobRoutine.pauseReason.getCode().name());
- if (jobRoutine.pauseReason != null && jobRoutine.pauseReason.getCode()
== InternalErrorCode.REPLICA_FEW_ERR) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("try to auto reschedule routine load {},
firstResumeTimestamp: {}, autoResumeCount: {}, "
+ + "pause reason: {}",
+ jobRoutine.id, jobRoutine.firstResumeTimestamp,
jobRoutine.autoResumeCount,
+ jobRoutine.pauseReason == null ? "null" :
jobRoutine.pauseReason.getCode().name());
+ }
+ if (jobRoutine.pauseReason != null
+ && jobRoutine.pauseReason.getCode() !=
InternalErrorCode.MANUAL_PAUSE_ERR
+ && jobRoutine.pauseReason.getCode() !=
InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR
+ && jobRoutine.pauseReason.getCode() !=
InternalErrorCode.CANNOT_RESUME_ERR) {
int dead = deadBeCount(jobRoutine.clusterName);
if (dead > Config.max_tolerable_backend_down_num) {
LOG.debug("dead backend num {} is larger than config {}, "
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]