This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new d08a418dd85 [branch-2.1](routine-load) optimize routine load job auto
resume policy (#37373)
d08a418dd85 is described below
commit d08a418dd85d6eb5f01aecd1f5aaa7568c781ef1
Author: hui lai <[email protected]>
AuthorDate: Sun Jul 7 18:16:56 2024 +0800
[branch-2.1](routine-load) optimize routine load job auto resume policy
(#37373)
pick #35266
---
.../main/java/org/apache/doris/common/Config.java | 2 +-
.../doris/load/routineload/RoutineLoadJob.java | 3 +-
.../doris/load/routineload/RoutineLoadManager.java | 3 +-
.../doris/load/routineload/ScheduleRule.java | 40 ++++++++++------------
.../load/routineload/RoutineLoadManagerTest.java | 11 +++---
5 files changed, 27 insertions(+), 32 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 26ef8852b45..94d5725c38a 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1257,7 +1257,7 @@ public class Config extends ConfigBase {
* a period for auto resume routine load
*/
@ConfField(mutable = true, masterOnly = true)
- public static int period_of_auto_resume_min = 5;
+ public static int period_of_auto_resume_min = 10;
/**
* If set to true, the backend will be automatically dropped after
finishing decommission.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 8c8dd7eaad9..130bd87b018 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -224,9 +224,8 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
protected int currentTaskConcurrentNum;
protected RoutineLoadProgress progress;
- protected long firstResumeTimestamp; // the first resume time
+ protected long latestResumeTimestamp; // the latest resume time
protected long autoResumeCount;
- protected boolean autoResumeLock = false; //it can't auto resume iff true
// some other msg which need to show to user;
protected String otherMsg = "";
protected ErrorReason pauseReason;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 356262f8c2a..6121f34035c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -367,8 +367,7 @@ public class RoutineLoadManager implements Writable {
try {
routineLoadJob.jobStatistic.errorRowsAfterResumed = 0;
routineLoadJob.autoResumeCount = 0;
- routineLoadJob.firstResumeTimestamp = 0;
- routineLoadJob.autoResumeLock = false;
+ routineLoadJob.latestResumeTimestamp = 0;
routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false
/* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB,
routineLoadJob.getId())
.add("current_state", routineLoadJob.getState())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
index 2b4ef7b297a..dfc47aa496e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java
@@ -31,6 +31,10 @@ import org.apache.logging.log4j.Logger;
public class ScheduleRule {
private static final Logger LOG = LogManager.getLogger(ScheduleRule.class);
+ private static final long BACK_OFF_BASIC_TIME_SEC = 10L;
+
+ private static final long MAX_BACK_OFF_TIME_SEC = 60 * 5;
+
private static int deadBeCount() {
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
int total = systemInfoService.getAllBackendIds(false).size();
@@ -47,22 +51,10 @@ public class ScheduleRule {
if (jobRoutine.state != RoutineLoadJob.JobState.PAUSED) {
return false;
}
- if (jobRoutine.autoResumeLock) { //only manual resume for unlock
- if (LOG.isDebugEnabled()) {
- LOG.debug("routine load job {}'s autoResumeLock is true,
skip", jobRoutine.id);
- }
- return false;
- }
/*
* Handle all backends are down.
*/
- if (LOG.isDebugEnabled()) {
- LOG.debug("try to auto reschedule routine load {},
firstResumeTimestamp: {}, autoResumeCount: {}, "
- + "pause reason: {}",
- jobRoutine.id, jobRoutine.firstResumeTimestamp,
jobRoutine.autoResumeCount,
- jobRoutine.pauseReason == null ? "null" :
jobRoutine.pauseReason.getCode().name());
- }
if (jobRoutine.pauseReason != null
&& jobRoutine.pauseReason.getCode() !=
InternalErrorCode.MANUAL_PAUSE_ERR
&& jobRoutine.pauseReason.getCode() !=
InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR
@@ -77,19 +69,25 @@ public class ScheduleRule {
return false;
}
- if (jobRoutine.firstResumeTimestamp == 0) { //the first resume
- jobRoutine.firstResumeTimestamp = System.currentTimeMillis();
+ if (jobRoutine.latestResumeTimestamp == 0) { //the first resume
+ jobRoutine.latestResumeTimestamp = System.currentTimeMillis();
jobRoutine.autoResumeCount = 1;
return true;
} else {
long current = System.currentTimeMillis();
- if (current - jobRoutine.firstResumeTimestamp <
Config.period_of_auto_resume_min * 60000L) {
- if (jobRoutine.autoResumeCount >= 3) {
- jobRoutine.autoResumeLock = true; // locked Auto
Resume RoutineLoadJob
- return false;
+ if (current - jobRoutine.latestResumeTimestamp <
Config.period_of_auto_resume_min * 60000L) {
+ long autoResumeIntervalTimeSec =
+ Math.min((long) Math.pow(2,
jobRoutine.autoResumeCount) * BACK_OFF_BASIC_TIME_SEC,
+ MAX_BACK_OFF_TIME_SEC);
+ if (current - jobRoutine.latestResumeTimestamp >
autoResumeIntervalTimeSec * 1000L) {
+ LOG.info("try to auto reschedule routine load {},
latestResumeTimestamp: {},"
+ + " autoResumeCount: {}, pause reason: {}",
+ jobRoutine.id,
jobRoutine.latestResumeTimestamp, jobRoutine.autoResumeCount,
+ jobRoutine.pauseReason == null ? "null" :
jobRoutine.pauseReason.getCode().name());
+ jobRoutine.latestResumeTimestamp =
System.currentTimeMillis();
+ jobRoutine.autoResumeCount++;
+ return true;
}
- jobRoutine.autoResumeCount++;
- return true;
} else {
/**
* for example:
@@ -98,7 +96,7 @@ public class ScheduleRule {
* the third resume time at 10:20
* --> we must be reset counter because a new
period for AutoResume RoutineLoadJob
*/
- jobRoutine.firstResumeTimestamp = current;
+ jobRoutine.latestResumeTimestamp = current;
jobRoutine.autoResumeCount = 1;
return true;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index 737936191e8..0eeb22ba2cb 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -638,21 +638,20 @@ public class RoutineLoadManagerTest {
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED,
routineLoadJob.getState());
- // 第一次自动恢复
for (int i = 0; i < 3; i++) {
Deencapsulation.setField(routineLoadJob, "pauseReason",
new ErrorReason(InternalErrorCode.REPLICA_FEW_ERR, ""));
+ try {
+ Thread.sleep(((long) Math.pow(2, i) * 10 * 1000L));
+ } catch (InterruptedException e) {
+ throw new UserException("thread sleep failed");
+ }
routineLoadManager.updateRoutineLoadJob();
Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE,
routineLoadJob.getState());
Deencapsulation.setField(routineLoadJob, "state",
RoutineLoadJob.JobState.PAUSED);
- boolean autoResumeLock = Deencapsulation.getField(routineLoadJob,
"autoResumeLock");
- Assert.assertEquals(autoResumeLock, false);
}
- // 第四次自动恢复 就会锁定
routineLoadManager.updateRoutineLoadJob();
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED,
routineLoadJob.getState());
- boolean autoResumeLock = Deencapsulation.getField(routineLoadJob,
"autoResumeLock");
- Assert.assertEquals(autoResumeLock, true);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]