This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 904a6c0fc1a [feat](job)Internal job cancellation immediately and the
strong association with the STARTS parameter (#36805)
904a6c0fc1a is described below
commit 904a6c0fc1a804520285533de874fe4d0ffff2c1
Author: Calvin Kirs <[email protected]>
AuthorDate: Wed Jul 10 10:08:19 2024 +0800
[feat](job)Internal job cancellation immediately and the strong association
with the STARTS parameter (#36805)
## Proposed changes
For internal tasks, such as MTMV, the start time may already be set, or
the time may be adjusted immediately.
<!--Describe your changes.-->
---
.../src/main/java/org/apache/doris/analysis/CreateJobStmt.java | 3 +++
.../java/org/apache/doris/job/base/JobExecutionConfiguration.java | 8 +++-----
.../src/main/java/org/apache/doris/job/base/TimerDefinition.java | 8 +-------
.../main/java/org/apache/doris/job/scheduler/JobScheduler.java | 4 ++++
.../org/apache/doris/job/base/JobExecutionConfigurationTest.java | 2 +-
5 files changed, 12 insertions(+), 13 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
index 367d03fa867..8a8db0a3d1e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
@@ -128,6 +128,7 @@ public class CreateJobStmt extends DdlStmt {
if (null != onceJobStartTimestamp) {
if
(onceJobStartTimestamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) {
jobExecutionConfiguration.setImmediate(true);
+ timerDefinition.setStartTimeMs(System.currentTimeMillis());
} else {
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(onceJobStartTimestamp));
}
@@ -149,6 +150,8 @@ public class CreateJobStmt extends DdlStmt {
if (null != startsTimeStamp) {
if (startsTimeStamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) {
jobExecutionConfiguration.setImmediate(true);
+ //To avoid immediate re-scheduling, set the start time of the
timer 100ms before the current time.
+ timerDefinition.setStartTimeMs(System.currentTimeMillis());
} else {
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
index 46bc2c71ea2..301222d5434 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
@@ -57,9 +57,7 @@ public class JobExecutionConfiguration {
if (executeType == JobExecuteType.INSTANT || executeType ==
JobExecuteType.MANUAL) {
return;
}
-
- checkTimerDefinition(immediate);
-
+ checkTimerDefinition();
if (executeType == JobExecuteType.ONE_TIME) {
validateStartTimeMs();
return;
@@ -80,12 +78,12 @@ public class JobExecutionConfiguration {
}
}
- private void checkTimerDefinition(boolean immediate) {
+ private void checkTimerDefinition() {
if (timerDefinition == null) {
throw new IllegalArgumentException(
"timerDefinition cannot be null when executeType is not
instant or manual");
}
- timerDefinition.checkParams(immediate);
+ timerDefinition.checkParams();
}
private void validateStartTimeMs() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java
index bcff4216c6e..9068a18f693 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java
@@ -38,13 +38,7 @@ public class TimerDefinition {
private Long latestSchedulerTimeMs;
- public void checkParams(boolean immediate) {
- if (null != startTimeMs && immediate) {
- throw new IllegalArgumentException("startTimeMs must be null when
immediate is true");
- }
- if (null == startTimeMs && immediate) {
- startTimeMs = System.currentTimeMillis();
- }
+ public void checkParams() {
if (null == startTimeMs) {
startTimeMs = System.currentTimeMillis() +
intervalUnit.getIntervalMs(interval);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index 2100511d22b..7f0133bf957 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -124,6 +124,10 @@ public class JobScheduler<T extends AbstractJob<?, C>, C>
implements Closeable {
schedulerInstantJob(job, TaskType.SCHEDULED, null);
}
}
+ if (job.getJobConfig().isImmediate() &&
JobExecuteType.ONE_TIME.equals(job.getJobConfig().getExecuteType())) {
+ schedulerInstantJob(job, TaskType.SCHEDULED, null);
+ return;
+ }
//RECURRING job and immediate is true
if (job.getJobConfig().isImmediate()) {
job.getJobConfig().getTimerDefinition().setLatestSchedulerTimeMs(System.currentTimeMillis());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
index 6d01f09c5ea..24c486baff8 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
@@ -75,8 +75,8 @@ public class JobExecutionConfigurationTest {
JobExecutionConfiguration configuration = new
JobExecutionConfiguration();
configuration.setExecuteType(JobExecuteType.ONE_TIME);
configuration.setImmediate(true);
- configuration.setImmediate(true);
TimerDefinition timerDefinition = new TimerDefinition();
+ timerDefinition.setStartTimeMs(0L);
configuration.setTimerDefinition(timerDefinition);
configuration.checkParams();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]