This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/dev-offline-sync by this push:
new a75f0576f5 [INLONG-10396][Manager] Support build-in schedule base on
quartz (#10412)
a75f0576f5 is described below
commit a75f0576f586e2d0c383915a608c4a13707ed7f2
Author: AloysZhang <[email protected]>
AuthorDate: Thu Jun 13 22:44:42 2024 +0800
[INLONG-10396][Manager] Support build-in schedule base on quartz (#10412)
---
.../inlong/manager/dao/entity/ScheduleEntity.java | 2 +-
.../inlong/manager/pojo/schedule/ScheduleInfo.java | 4 +-
.../manager/pojo/schedule/ScheduleInfoRequest.java | 4 +-
inlong-manager/manager-schedule/pom.xml | 31 ++++
.../org/apache/inlong/schedule/ScheduleType.java | 45 ++++++
.../org/apache/inlong/schedule/ScheduleUnit.java | 48 ++++++
.../exception/QuartzScheduleException.java | 32 ++++
.../schedule/quartz/QuartzOfflineSyncJob.java | 40 +++++
.../schedule/quartz/QuartzScheduleClient.java | 50 +++++++
.../schedule/quartz/QuartzScheduleEngine.java | 156 ++++++++++++++++++++
.../schedule/quartz/QuartzSchedulerListener.java | 137 +++++++++++++++++
.../apache/inlong/schedule/util/ScheduleUtils.java | 139 +++++++++++++++++
.../apache/inlong/schedule/BaseScheduleTest.java | 116 +++++++++++++++
.../org/apache/inlong/schedule/quartz/MockJob.java | 48 ++++++
.../schedule/quartz/QuartzScheduleEngineTest.java | 161 ++++++++++++++++++++
.../inlong/schedule/util/ScheduleUtilsTest.java | 164 +++++++++++++++++++++
.../manager-schedule/src/test/resources/log4j2.xml | 46 ++++++
.../main/resources/h2/apache_inlong_manager.sql | 2 +-
.../manager-web/sql/apache_inlong_manager.sql | 2 +-
inlong-manager/manager-web/sql/changes-1.13.0.sql | 2 +-
inlong-manager/pom.xml | 2 +-
21 files changed, 1222 insertions(+), 9 deletions(-)
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
index 75237343cb..b2d49cd43c 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
@@ -34,7 +34,7 @@ public class ScheduleEntity implements Serializable {
// schedule type, support [normal, crontab], 0 for normal and 1 for crontab
private Integer scheduleType;
// time unit for offline task schedule interval, support [month, week,
day, hour, minute, oneway]
- // M=month, W=week, D=day, H=hour, M=minute, O=oneway
+ // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway
private String scheduleUnit;
private Integer scheduleInterval;
// schedule start time, long type timestamp
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
index 13afce70a4..2386d817bb 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
@@ -50,7 +50,7 @@ public class ScheduleInfo {
private Integer scheduleType;
// time unit for offline task schedule interval, support [month, week,
day, hour, minute, oneway]
- // M=month, W=week, D=day, H=hour, M=minute, O=oneway
+ // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway
@ApiModelProperty("TimeUnit for schedule interval")
private String scheduleUnit;
@@ -73,7 +73,7 @@ public class ScheduleInfo {
private Integer taskParallelism;
@ApiModelProperty("Schedule task parallelism")
- private Integer crontabExpression;
+ private String crontabExpression;
@ApiModelProperty(value = "Version number")
@NotNull(groups = UpdateValidation.class, message = "version cannot be
null")
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
index eff1660719..b3c117da9a 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
@@ -44,7 +44,7 @@ public class ScheduleInfoRequest {
private Integer scheduleType;
// time unit for offline task schedule interval, support [month, week,
day, hour, minute, oneway]
- // M=month, W=week, D=day, H=hour, M=minute, O=oneway
+ // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway
@ApiModelProperty("TimeUnit for schedule interval")
private String scheduleUnit;
@@ -67,7 +67,7 @@ public class ScheduleInfoRequest {
private Integer taskParallelism;
@ApiModelProperty("Schedule task parallelism")
- private Integer crontabExpression;
+ private String crontabExpression;
@ApiModelProperty(value = "Version number")
@NotNull(groups = UpdateValidation.class, message = "version cannot be
null")
diff --git a/inlong-manager/manager-schedule/pom.xml
b/inlong-manager/manager-schedule/pom.xml
index f3a4e0a41f..8d598b47ca 100644
--- a/inlong-manager/manager-schedule/pom.xml
+++ b/inlong-manager/manager-schedule/pom.xml
@@ -27,6 +27,7 @@
<artifactId>manager-schedule</artifactId>
<properties>
+ <quartz.version>2.3.2</quartz.version>
<inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
@@ -37,5 +38,35 @@
<artifactId>manager-pojo</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.quartz-scheduler</groupId>
+ <artifactId>quartz</artifactId>
+ <version>${quartz.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.mchange</groupId>
+ <artifactId>c3p0</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.mchange</groupId>
+ <artifactId>mchange-commons-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.zaxxer</groupId>
+ <artifactId>HikariCP-java7</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>${awaitility.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleType.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleType.java
new file mode 100644
index 0000000000..0f296fec42
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleType.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule;
+
+import lombok.Getter;
+
+/**
+ * Type of schedule, including normal and crontab.
+ * */
+@Getter
+public enum ScheduleType {
+
+ NORMAL(0),
+ CRONTAB(1);
+
+ private final int code;
+
+ ScheduleType(int code) {
+ this.code = code;
+ }
+
+ public static ScheduleType fromCode(int code) {
+ for (ScheduleType type : ScheduleType.values()) {
+ if (type.code == code) {
+ return type;
+ }
+ }
+ return null;
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleUnit.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleUnit.java
new file mode 100644
index 0000000000..7b4d60779f
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleUnit.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule;
+
+import lombok.Getter;
+
+@Getter
+public enum ScheduleUnit {
+
+ YEAR("Y"),
+ MONTH("M"),
+ DAY("D"),
+ WEEK("W"),
+ HOUR("H"),
+ MINUTE("I"),
+ SECOND("S"),
+ ONE_WAY("O");
+
+ final String unit;
+
+ ScheduleUnit(String unit) {
+ this.unit = unit;
+ }
+
+ public static ScheduleUnit getScheduleUnit(String unit) {
+ for (ScheduleUnit scheduleUnit : ScheduleUnit.values()) {
+ if (scheduleUnit.unit.equalsIgnoreCase(unit)) {
+ return scheduleUnit;
+ }
+ }
+ return null;
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/exception/QuartzScheduleException.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/exception/QuartzScheduleException.java
new file mode 100644
index 0000000000..2d0ff005bd
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/exception/QuartzScheduleException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule.exception;
+
+/**
+ * Exceptions occur in the schedule procedure.
+ * */
+public class QuartzScheduleException extends RuntimeException {
+
+ public QuartzScheduleException(String message) {
+ super(message);
+ }
+
+ public QuartzScheduleException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzOfflineSyncJob.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzOfflineSyncJob.java
new file mode 100644
index 0000000000..27628b3320
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzOfflineSyncJob.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule.quartz;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.quartz.Job;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class QuartzOfflineSyncJob implements Job {
+
+ private ScheduleInfo scheduleInfo;
+
+ @Override
+ public void execute(JobExecutionContext context) throws
JobExecutionException {
+ // TODO: complete the offline sync logic
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleClient.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleClient.java
new file mode 100644
index 0000000000..41fc8814a1
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule.quartz;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.schedule.ScheduleEngineClient;
+
+/**
+ * Built-in implementation of schedule engine client corresponding with {@link
QuartzScheduleEngine}.
+ * QuartzScheduleClient simply invokes the {@link QuartzScheduleEngine} to
register/unregister/update
+ * schedule info instead of calling a remote schedule service.
+ * */
+public class QuartzScheduleClient implements ScheduleEngineClient {
+
+ private final QuartzScheduleEngine scheduleEngine;
+
+ public QuartzScheduleClient(QuartzScheduleEngine scheduleEngine) {
+ this.scheduleEngine = scheduleEngine;
+ }
+
+ @Override
+ public boolean register(ScheduleInfo scheduleInfo) {
+ return scheduleEngine.handleRegister(scheduleInfo);
+ }
+
+ @Override
+ public boolean unregister(ScheduleInfo scheduleInfo) {
+ return scheduleEngine.handleUnregister(scheduleInfo);
+ }
+
+ @Override
+ public boolean update(ScheduleInfo scheduleInfo) {
+ return scheduleEngine.handleUpdate(scheduleInfo);
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngine.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngine.java
new file mode 100644
index 0000000000..31736f1887
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngine.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule.quartz;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.schedule.ScheduleEngine;
+import org.apache.inlong.schedule.exception.QuartzScheduleException;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.inlong.schedule.util.ScheduleUtils.genQuartzJobDetail;
+import static org.apache.inlong.schedule.util.ScheduleUtils.genQuartzTrigger;
+
+/**
+ * The default implementation of schedule engine based on Quartz scheduler.
Response for processing
+ * the register/unregister/update requests from {@link QuartzScheduleClient}
+ * */
+@Getter
+public class QuartzScheduleEngine implements ScheduleEngine {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QuartzScheduleEngine.class);
+
+ private final Scheduler scheduler;
+ private final Set<String> scheduledJobSet = new HashSet<>();
+
+ public QuartzScheduleEngine() {
+ try {
+ this.scheduler = new StdSchedulerFactory().getScheduler();
+ LOGGER.info("Quartz scheduler engine initialized");
+ } catch (SchedulerException e) {
+ throw new QuartzScheduleException("Failed to init quartz scheduler
", e);
+ }
+ }
+
+ @Override
+ public void start() {
+ try {
+ // add listener
+ scheduler.getListenerManager().addSchedulerListener(new
QuartzSchedulerListener(this));
+ scheduler.start();
+ LOGGER.info("Quartz scheduler engine started");
+ } catch (SchedulerException e) {
+ throw new QuartzScheduleException("Failed to start quartz
scheduler ", e);
+ }
+ }
+
+ /**
+ * Clean job info from scheduledJobSet after trigger finalized.
+ * */
+ public boolean triggerFinalized(Trigger trigger) {
+ String jobName = trigger.getJobKey().getName();
+ LOGGER.info("Trigger finalized for job {}", jobName);
+ return scheduledJobSet.remove(jobName);
+ }
+
+ /**
+ * Handle schedule register.
+ * @param scheduleInfo schedule info to register
+ * */
+ @Override
+ public boolean handleRegister(ScheduleInfo scheduleInfo) {
+ return handleRegister(scheduleInfo, QuartzOfflineSyncJob.class);
+ }
+
+ @VisibleForTesting
+ public boolean handleRegister(ScheduleInfo scheduleInfo, Class<? extends
QuartzOfflineSyncJob> clz) {
+ if (scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) {
+ throw new QuartzScheduleException("Group " +
scheduleInfo.getInlongGroupId() + " is already registered");
+ }
+ JobDetail jobDetail = genQuartzJobDetail(scheduleInfo, clz);
+ Trigger trigger = genQuartzTrigger(jobDetail, scheduleInfo);
+ try {
+ scheduler.scheduleJob(jobDetail, trigger);
+ scheduledJobSet.add(scheduleInfo.getInlongGroupId());
+ LOGGER.info("Registered new schedule info for {}",
scheduleInfo.getInlongGroupId());
+ } catch (SchedulerException e) {
+ throw new QuartzScheduleException(e.getMessage());
+ }
+ return false;
+ }
+
+ /**
+ * Handle schedule unregister.
+ * @param scheduleInfo schedule info to unregister
+ * */
+ @Override
+ public boolean handleUnregister(ScheduleInfo scheduleInfo) {
+ if (scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) {
+ try {
+ scheduler.deleteJob(new
JobKey(scheduleInfo.getInlongGroupId()));
+ } catch (SchedulerException e) {
+ throw new QuartzScheduleException(e.getMessage());
+ }
+ }
+ scheduledJobSet.remove(scheduleInfo.getInlongGroupId());
+ LOGGER.info("Un-registered schedule info for {}",
scheduleInfo.getInlongGroupId());
+ return true;
+ }
+
+ /**
+ * Handle schedule update.
+ * @param scheduleInfo schedule info to update
+ * */
+ @Override
+ public boolean handleUpdate(ScheduleInfo scheduleInfo) {
+ return handleUpdate(scheduleInfo, QuartzOfflineSyncJob.class);
+ }
+
+ @VisibleForTesting
+ public boolean handleUpdate(ScheduleInfo scheduleInfo, Class<? extends
QuartzOfflineSyncJob> clz) {
+ handleUnregister(scheduleInfo);
+ handleRegister(scheduleInfo, clz);
+ LOGGER.info("Updated schedule info for {}",
scheduleInfo.getInlongGroupId());
+ return false;
+ }
+
+ @Override
+ public void stop() {
+ if (scheduler != null) {
+ try {
+ scheduler.shutdown();
+ LOGGER.info("Quartz scheduler engine stopped");
+ } catch (SchedulerException e) {
+ throw new QuartzScheduleException("Failed to stop quartz
scheduler ", e);
+ }
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzSchedulerListener.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzSchedulerListener.java
new file mode 100644
index 0000000000..ca4f9f1b03
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzSchedulerListener.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule.quartz;
+
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.SchedulerListener;
+import org.quartz.Trigger;
+import org.quartz.TriggerKey;
+
+/**
+ * Default implementation for quartz scheduler listener.
+ * */
+public class QuartzSchedulerListener implements SchedulerListener {
+
+ QuartzScheduleEngine quartzScheduleEngine;
+
+ public QuartzSchedulerListener(QuartzScheduleEngine quartzScheduleEngine) {
+ this.quartzScheduleEngine = quartzScheduleEngine;
+ }
+
+ @Override
+ public void jobScheduled(Trigger trigger) {
+
+ }
+
+ @Override
+ public void jobUnscheduled(TriggerKey triggerKey) {
+
+ }
+
+ @Override
+ public void triggerFinalized(Trigger trigger) {
+ quartzScheduleEngine.triggerFinalized(trigger);
+ }
+
+ @Override
+ public void triggerPaused(TriggerKey triggerKey) {
+
+ }
+
+ @Override
+ public void triggersPaused(String triggerGroup) {
+
+ }
+
+ @Override
+ public void triggerResumed(TriggerKey triggerKey) {
+
+ }
+
+ @Override
+ public void triggersResumed(String triggerGroup) {
+
+ }
+
+ @Override
+ public void jobAdded(JobDetail jobDetail) {
+
+ }
+
+ @Override
+ public void jobDeleted(JobKey jobKey) {
+
+ }
+
+ @Override
+ public void jobPaused(JobKey jobKey) {
+
+ }
+
+ @Override
+ public void jobsPaused(String jobGroup) {
+
+ }
+
+ @Override
+ public void jobResumed(JobKey jobKey) {
+
+ }
+
+ @Override
+ public void jobsResumed(String jobGroup) {
+
+ }
+
+ @Override
+ public void schedulerError(String msg, SchedulerException cause) {
+
+ }
+
+ @Override
+ public void schedulerInStandbyMode() {
+
+ }
+
+ @Override
+ public void schedulerStarted() {
+
+ }
+
+ @Override
+ public void schedulerStarting() {
+
+ }
+
+ @Override
+ public void schedulerShutdown() {
+
+ }
+
+ @Override
+ public void schedulerShuttingdown() {
+
+ }
+
+ @Override
+ public void schedulingDataCleared() {
+
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/util/ScheduleUtils.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/util/ScheduleUtils.java
new file mode 100644
index 0000000000..73114fcb6c
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/util/ScheduleUtils.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule.util;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.schedule.ScheduleType;
+import org.apache.inlong.schedule.ScheduleUnit;
+import org.apache.inlong.schedule.exception.QuartzScheduleException;
+import org.apache.inlong.schedule.quartz.QuartzOfflineSyncJob;
+
+import org.apache.commons.lang3.StringUtils;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.CronTrigger;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.ScheduleBuilder;
+import org.quartz.SimpleScheduleBuilder;
+import org.quartz.SimpleTrigger;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+
+import java.sql.Timestamp;
+import java.util.Date;
+
+/**
+ * Tools for schedule.
+ * */
+public class ScheduleUtils {
+
+ public static JobDetail genQuartzJobDetail(ScheduleInfo scheduleInfo,
Class<? extends QuartzOfflineSyncJob> clz) {
+ return JobBuilder.newJob(clz)
+ .withIdentity(scheduleInfo.getInlongGroupId())
+ .build();
+ }
+
+ public static Trigger genQuartzTrigger(JobDetail jobDetail, ScheduleInfo
scheduleInfo) {
+ String key = jobDetail.getKey().getName();
+ Timestamp startTime = scheduleInfo.getStartTime();
+ Timestamp endTime = scheduleInfo.getEndTime();
+ int scheduleType = scheduleInfo.getScheduleType();
+ ScheduleType type = ScheduleType.fromCode(scheduleType);
+ if (type == null) {
+ throw new QuartzScheduleException("Invalid schedule type: " +
scheduleType);
+ }
+ switch (type) {
+ case NORMAL:
+ return TriggerBuilder.newTrigger()
+ .withIdentity(key)
+ .startAt(new Date(startTime.getTime()))
+ .endAt(new Date(endTime.getTime()))
+
.withSchedule(genSimpleQuartzScheduleBuilder(scheduleInfo.getScheduleInterval(),
+ scheduleInfo.getScheduleUnit()))
+ .forJob(jobDetail).build();
+ case CRONTAB:
+ return TriggerBuilder.newTrigger()
+ .withIdentity(key)
+ .startAt(new Date(startTime.getTime()))
+ .endAt(new Date(endTime.getTime()))
+
.withSchedule(genCronQuartzScheduleBuilder(scheduleInfo.getCrontabExpression()))
+ .forJob(jobDetail).build();
+ default:
+ throw new QuartzScheduleException("Unknown schedule type: " +
scheduleType);
+ }
+ }
+
+ // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway
+ public static ScheduleBuilder<SimpleTrigger>
genSimpleQuartzScheduleBuilder(int interval, String scheduleUnit) {
+ if (StringUtils.isBlank(scheduleUnit)) {
+ throw new QuartzScheduleException("Schedule unit cannot be empty");
+ }
+ ScheduleUnit unit = ScheduleUnit.getScheduleUnit(scheduleUnit);
+ if (unit == null) {
+ throw new QuartzScheduleException("Unknown schedule unit: " +
scheduleUnit);
+ }
+ switch (unit) {
+ case YEAR:
+ return SimpleScheduleBuilder
+ .simpleSchedule()
+ .withIntervalInHours(365 * 24 * interval)
+ .repeatForever();
+ case MONTH:
+ return SimpleScheduleBuilder
+ .simpleSchedule()
+ .withIntervalInHours(30 * 24 * interval)
+ .repeatForever();
+ case WEEK:
+ return SimpleScheduleBuilder
+ .simpleSchedule()
+ .withIntervalInHours(7 * 24 * interval)
+ .repeatForever();
+ case DAY:
+ return SimpleScheduleBuilder
+ .simpleSchedule()
+ .withIntervalInHours(24 * interval)
+ .repeatForever();
+ case HOUR:
+ return SimpleScheduleBuilder
+ .simpleSchedule()
+ .withIntervalInHours(interval)
+ .repeatForever();
+ case MINUTE:
+ return SimpleScheduleBuilder
+ .simpleSchedule()
+ .withIntervalInMinutes(interval)
+ .repeatForever();
+ case SECOND:
+ return SimpleScheduleBuilder
+ .simpleSchedule()
+ .withIntervalInSeconds(interval)
+ .repeatForever();
+ case ONE_WAY:
+ return SimpleScheduleBuilder
+ .simpleSchedule()
+ .withIntervalInSeconds(interval)
+ .withRepeatCount(1);
+ default:
+ throw new QuartzScheduleException("Not supported schedule
interval" + scheduleUnit);
+ }
+ }
+
+ public static ScheduleBuilder<CronTrigger>
genCronQuartzScheduleBuilder(String cronExpression) {
+ return
CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionIgnoreMisfires();
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/BaseScheduleTest.java
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/BaseScheduleTest.java
new file mode 100644
index 0000000000..c0ccc4b14c
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/BaseScheduleTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.schedule.exception.QuartzScheduleException;
+
+import java.sql.Timestamp;
+
+import static org.apache.inlong.schedule.ScheduleUnit.SECOND;
+
+public class BaseScheduleTest {
+
+ public static final int SCHEDULE_TYPE_NORMAL = 0;
+ public static final int SCHEDULE_TYPE_CRON = 1;
+ public static final int DEFAULT_INTERVAL = 2;
+ public static final long DEFAULT_SPAN_IN_MS = 10 * 1000;
+ public static final String ILLEGAL_TIMEUNIT = "I";
+ public static final String GROUP_ID = "testGroup";
+ public static final String CRON_EXPRESSION_PER_SECONDS = "*/1 * * * * ?";
+ public static final int CRON_SCHEDULE_INTERVAL_PER_SECONDS = 1;
+ public static final String CRON_EXPRESSION_EVERY_TWO_SECONDS = "*/2 * * *
* ?";
+ public static final int CRON_SCHEDULE_INTERVAL_EVERY_TWO_SECONDS = 2;
+ public static final String ILLEGAL_CRON_EXPRESSION = "*/1 * * ?";
+
+ public ScheduleInfo genDefaultScheduleInfo() {
+ return genNormalScheduleInfo(GROUP_ID, SECOND.getUnit(),
DEFAULT_INTERVAL, DEFAULT_SPAN_IN_MS);
+ }
+
+ public ScheduleInfo genNormalScheduleInfo(String groupId, String
scheduleUnit, int scheduleInterval,
+ long timeSpanInMs) {
+ ScheduleInfo scheduleInfo = new ScheduleInfo();
+ scheduleInfo.setInlongGroupId(groupId);
+ scheduleInfo.setScheduleType(SCHEDULE_TYPE_NORMAL);
+ scheduleInfo.setScheduleUnit(scheduleUnit);
+ scheduleInfo.setScheduleInterval(scheduleInterval);
+ setStartAndEndTime(scheduleInfo, timeSpanInMs);
+ return scheduleInfo;
+ }
+
+ public ScheduleInfo genDefaultCronScheduleInfo() {
+ return genCronScheduleInfo(GROUP_ID, CRON_EXPRESSION_PER_SECONDS,
DEFAULT_SPAN_IN_MS);
+ }
+
+ public ScheduleInfo genCronScheduleInfo(String groupId, String
cronExpression, long timeSpanInMs) {
+ ScheduleInfo scheduleInfo = new ScheduleInfo();
+ scheduleInfo.setInlongGroupId(groupId);
+ scheduleInfo.setScheduleType(SCHEDULE_TYPE_CRON);
+ scheduleInfo.setCrontabExpression(cronExpression);
+ setStartAndEndTime(scheduleInfo, timeSpanInMs);
+ return scheduleInfo;
+ }
+
+ private void setStartAndEndTime(ScheduleInfo scheduleInfo, long
timeSpanInMs) {
+ long startTime = System.currentTimeMillis() / 1000 * 1000;
+ long endTime = startTime + timeSpanInMs;
+ scheduleInfo.setStartTime(new Timestamp(startTime));
+ scheduleInfo.setEndTime(new Timestamp(endTime));
+ }
+
+ protected long calculateScheduleTimes(ScheduleInfo scheduleInfo, boolean
isCron) {
+
+ long timeSpanInMs = scheduleInfo.getEndTime().getTime() -
scheduleInfo.getStartTime().getTime();
+ int interval = -1;
+ ScheduleUnit scheduleUnit = null;
+ if (isCron) {
+ if
(scheduleInfo.getCrontabExpression().equalsIgnoreCase(CRON_EXPRESSION_PER_SECONDS))
{
+ interval = CRON_SCHEDULE_INTERVAL_PER_SECONDS;
+ } else if
(scheduleInfo.getCrontabExpression().equalsIgnoreCase(CRON_EXPRESSION_EVERY_TWO_SECONDS))
{
+ interval = CRON_SCHEDULE_INTERVAL_EVERY_TWO_SECONDS;
+ }
+ scheduleUnit = SECOND;
+ } else {
+ interval = scheduleInfo.getScheduleInterval();
+ scheduleUnit =
ScheduleUnit.getScheduleUnit(scheduleInfo.getScheduleUnit());
+ }
+ if (scheduleUnit == null) {
+ throw new QuartzScheduleException("Schedule unit is null");
+ }
+ switch (scheduleUnit) {
+ case YEAR:
+ return timeSpanInMs / 365 / 1000 / 3600 / 24 / 7 / interval;
+ case MONTH:
+ return timeSpanInMs / 30 / 1000 / 3600 / 24 / 7 / interval;
+ case WEEK:
+ return timeSpanInMs / 1000 / 3600 / 24 / 7 / interval;
+ case DAY:
+ return timeSpanInMs / 1000 / 3600 / 24 / interval;
+ case HOUR:
+ return timeSpanInMs / 1000 / 3600 / interval;
+ case MINUTE:
+ return timeSpanInMs / 1000 / 60 / interval;
+ case SECOND:
+ return timeSpanInMs / 1000 / interval;
+ case ONE_WAY:
+ return 1;
+ default:
+ return 0;
+ }
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/MockJob.java
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/MockJob.java
new file mode 100644
index 0000000000..47ce19ad09
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/MockJob.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule.quartz;
+
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MockJob extends QuartzOfflineSyncJob {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MockJob.class);
+
+ public static CountDownLatch countDownLatch;
+ private static AtomicInteger counter = new AtomicInteger(0);
+
+ @Override
+ public void execute(JobExecutionContext context) throws
JobExecutionException {
+ if (countDownLatch.getCount() > 0) {
+ countDownLatch.countDown();
+ }
+ LOGGER.info("MockJob executed " + counter.incrementAndGet());
+ }
+
+ public static void setCount(int count) {
+ countDownLatch = new CountDownLatch(count);
+ counter.set(0);
+ LOGGER.info("MockJob has been reset.");
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngineTest.java
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngineTest.java
new file mode 100644
index 0000000000..fc5ca9d9fd
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngineTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule.quartz;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.schedule.BaseScheduleTest;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.quartz.JobKey;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.inlong.schedule.ScheduleUnit.SECOND;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class QuartzScheduleEngineTest extends BaseScheduleTest {
+
+ private static QuartzScheduleEngine scheduleEngine;
+
+ @BeforeAll
+ public static void initScheduleEngine() throws Exception {
+ scheduleEngine = new QuartzScheduleEngine();
+ scheduleEngine.start();
+ }
+
+ @Test
+ @Timeout(30)
+ public void testRegisterScheduleInfo() throws Exception {
+ // 1. test for normal schedule
+ ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
+ testRegister(scheduleInfo, false);
+
+ // 2. test for cron schedule
+ scheduleInfo = genDefaultCronScheduleInfo();
+ testRegister(scheduleInfo, true);
+ }
+
+ private void testRegister(ScheduleInfo scheduleInfo, boolean isCrontab)
throws Exception {
+ // cal total schedule times
+ long expectCount = calculateScheduleTimes(scheduleInfo, isCrontab);
+ // set countdown latch
+ MockJob.setCount((int) expectCount);
+ // register schedule info
+ scheduleEngine.handleRegister(scheduleInfo, MockJob.class);
+ // check job exist
+ assertEquals(1, scheduleEngine.getScheduledJobSet().size());
+ JobKey jobKey = new JobKey(scheduleInfo.getInlongGroupId());
+ boolean exist = scheduleEngine.getScheduler().checkExists(jobKey);
+ assertTrue(exist);
+ MockJob.countDownLatch.await();
+
+ // not job exist after scheduled
+ await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals(0, scheduleEngine.getScheduledJobSet().size());
+ assertFalse(scheduleEngine.getScheduler().checkExists(jobKey));
+ });
+ }
+
+ @Test
+ @Timeout(30)
+ public void testUnRegisterScheduleInfo() throws Exception {
+ // 1. test for normal schedule
+ ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
+ testUnRegister(scheduleInfo, false);
+
+ // 2. test for cron schedule, gen cron schedule info, */2 * * * * ?
+ scheduleInfo = genDefaultCronScheduleInfo();
+ testUnRegister(scheduleInfo, true);
+ }
+
+ private void testUnRegister(ScheduleInfo scheduleInfo, boolean isCrontab)
throws Exception {
+ // cal total schedule times
+ long expectCount = calculateScheduleTimes(scheduleInfo, isCrontab);
+
+ MockJob.setCount((int) (expectCount / 2));
+ // register schedule info
+ scheduleEngine.handleRegister(scheduleInfo, MockJob.class);
+ // check job exist
+ assertEquals(1, scheduleEngine.getScheduledJobSet().size());
+ JobKey jobKey = new JobKey(scheduleInfo.getInlongGroupId());
+ boolean exist = scheduleEngine.getScheduler().checkExists(jobKey);
+ assertTrue(exist);
+ MockJob.countDownLatch.await();
+
+ // un-register before trigger finalized
+ scheduleEngine.handleUnregister(scheduleInfo);
+ // not job exist after un-register
+ assertEquals(0, scheduleEngine.getScheduledJobSet().size());
+ exist = scheduleEngine.getScheduler().checkExists(jobKey);
+ assertFalse(exist);
+ }
+
+ @Test
+ @Timeout(50)
+ public void testUpdateScheduleInfo() throws Exception {
+ // 1. test for normal schedule
+ ScheduleInfo scheduleInfo =
+ genNormalScheduleInfo(GROUP_ID, SECOND.getUnit(),
DEFAULT_INTERVAL, DEFAULT_SPAN_IN_MS);
+ ScheduleInfo scheduleInfoToUpdate =
+ genNormalScheduleInfo(GROUP_ID, SECOND.getUnit(),
DEFAULT_INTERVAL / 2, DEFAULT_SPAN_IN_MS);
+ testUpdate(scheduleInfo, scheduleInfoToUpdate, false);
+
+ // 2. test for cron schedule
+ scheduleInfo = genCronScheduleInfo(GROUP_ID,
CRON_EXPRESSION_EVERY_TWO_SECONDS, DEFAULT_SPAN_IN_MS);
+ scheduleInfoToUpdate = genCronScheduleInfo(GROUP_ID,
CRON_EXPRESSION_PER_SECONDS, DEFAULT_SPAN_IN_MS);
+ testUpdate(scheduleInfo, scheduleInfoToUpdate, true);
+ }
+
+ public void testUpdate(ScheduleInfo scheduleInfo, ScheduleInfo
scheduleInfoToUpdate, boolean isCrontab)
+ throws Exception {
+ // cal total schedule times
+ long expectCount = calculateScheduleTimes(scheduleInfo, isCrontab);
+ MockJob.setCount((int) (expectCount / 2));
+ // register schedule info
+ scheduleEngine.handleRegister(scheduleInfo, MockJob.class);
+ // check job exist
+ assertEquals(1, scheduleEngine.getScheduledJobSet().size());
+ JobKey jobKey = new JobKey(scheduleInfo.getInlongGroupId());
+ boolean exist = scheduleEngine.getScheduler().checkExists(jobKey);
+ assertTrue(exist);
+ MockJob.countDownLatch.await();
+
+ // update schedule before trigger finalized
+ expectCount = calculateScheduleTimes(scheduleInfoToUpdate, isCrontab);
+ MockJob.setCount((int) expectCount);
+ scheduleEngine.handleUpdate(scheduleInfoToUpdate, MockJob.class);
+
+ // job scheduled after updated
+ assertEquals(1, scheduleEngine.getScheduledJobSet().size());
+ exist = scheduleEngine.getScheduler().checkExists(jobKey);
+ assertTrue(exist);
+
+ MockJob.countDownLatch.await();
+
+ // not job exist after scheduled
+ await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals(0, scheduleEngine.getScheduledJobSet().size());
+ assertFalse(scheduleEngine.getScheduler().checkExists(jobKey));
+ });
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/util/ScheduleUtilsTest.java
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/util/ScheduleUtilsTest.java
new file mode 100644
index 0000000000..77b84f3064
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/util/ScheduleUtilsTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule.util;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.schedule.BaseScheduleTest;
+import org.apache.inlong.schedule.exception.QuartzScheduleException;
+import org.apache.inlong.schedule.quartz.QuartzOfflineSyncJob;
+
+import org.junit.jupiter.api.Test;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.CronTrigger;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.ScheduleBuilder;
+import org.quartz.SimpleScheduleBuilder;
+import org.quartz.SimpleTrigger;
+import org.quartz.Trigger;
+import org.quartz.TriggerKey;
+
+import java.util.Date;
+
+import static org.apache.inlong.schedule.ScheduleUnit.DAY;
+import static org.apache.inlong.schedule.ScheduleUnit.HOUR;
+import static org.apache.inlong.schedule.ScheduleUnit.MINUTE;
+import static org.apache.inlong.schedule.ScheduleUnit.MONTH;
+import static org.apache.inlong.schedule.ScheduleUnit.ONE_WAY;
+import static org.apache.inlong.schedule.ScheduleUnit.WEEK;
+import static org.apache.inlong.schedule.ScheduleUnit.YEAR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ScheduleUtilsTest extends BaseScheduleTest {
+
+ @Test
+ public void testGenScheduleBuilder() {
+ ScheduleBuilder<SimpleTrigger> builder =
+ ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL,
YEAR.getUnit());
+ assertNotNull(builder);
+ assertInstanceOf(SimpleScheduleBuilder.class, builder);
+
+ builder =
ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, MONTH.getUnit());
+ assertNotNull(builder);
+ assertInstanceOf(SimpleScheduleBuilder.class, builder);
+
+ builder =
ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, WEEK.getUnit());
+ assertNotNull(builder);
+ assertInstanceOf(SimpleScheduleBuilder.class, builder);
+
+ builder =
ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, DAY.getUnit());
+ assertNotNull(builder);
+ assertInstanceOf(SimpleScheduleBuilder.class, builder);
+
+ builder =
ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, HOUR.getUnit());
+ assertNotNull(builder);
+ assertInstanceOf(SimpleScheduleBuilder.class, builder);
+
+ builder =
ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL,
MINUTE.getUnit());
+ assertNotNull(builder);
+ assertInstanceOf(SimpleScheduleBuilder.class, builder);
+
+ builder =
ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL,
ONE_WAY.getUnit());
+ assertNotNull(builder);
+ assertInstanceOf(SimpleScheduleBuilder.class, builder);
+
+ try {
+ ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL,
ILLEGAL_TIMEUNIT);
+ } catch (Exception e) {
+ assertInstanceOf(QuartzScheduleException.class, e);
+ }
+
+ ScheduleBuilder<CronTrigger> cronBuilder =
ScheduleUtils.genCronQuartzScheduleBuilder(
+ CRON_EXPRESSION_EVERY_TWO_SECONDS);
+ assertNotNull(cronBuilder);
+ assertInstanceOf(CronScheduleBuilder.class, cronBuilder);
+
+ try {
+
ScheduleUtils.genCronQuartzScheduleBuilder(ILLEGAL_CRON_EXPRESSION);
+ } catch (Exception e) {
+ String errorMsg = e.getMessage();
+ assertTrue(errorMsg.contains(ILLEGAL_CRON_EXPRESSION));
+ }
+ }
+
+ @Test
+ public void testGenJobDetail() {
+ ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
+ JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo,
QuartzOfflineSyncJob.class);
+ assertNotNull(jobDetail);
+
+ JobKey jobKey = jobDetail.getKey();
+ assertNotNull(jobKey);
+
+ String identity = jobKey.getName();
+ assertEquals(scheduleInfo.getInlongGroupId(), identity);
+ }
+
+ @Test
+ public void testGenCronTrigger() {
+ // normal
+ ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
+ JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo,
QuartzOfflineSyncJob.class);
+
+ Trigger trigger = ScheduleUtils.genQuartzTrigger(jobDetail,
scheduleInfo);
+ assertNotNull(trigger);
+
+ TriggerKey triggerKey = trigger.getKey();
+ assertNotNull(triggerKey);
+ String identity = triggerKey.getName();
+ assertEquals(scheduleInfo.getInlongGroupId(), identity);
+
+ ScheduleBuilder<? extends Trigger> scheduleBuilder =
trigger.getScheduleBuilder();
+ assertInstanceOf(SimpleScheduleBuilder.class, scheduleBuilder);
+
+ Date startDate = trigger.getStartTime();
+ assertNotNull(startDate);
+ assertEquals(startDate.getTime(),
scheduleInfo.getStartTime().getTime());
+
+ Date endDate = trigger.getEndTime();
+ assertNotNull(endDate);
+ assertEquals(endDate.getTime(), scheduleInfo.getEndTime().getTime());
+
+ // cron
+ scheduleInfo = genDefaultCronScheduleInfo();
+ jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo,
QuartzOfflineSyncJob.class);
+
+ trigger = ScheduleUtils.genQuartzTrigger(jobDetail, scheduleInfo);
+ assertNotNull(trigger);
+
+ triggerKey = trigger.getKey();
+ assertNotNull(triggerKey);
+ identity = triggerKey.getName();
+ assertEquals(scheduleInfo.getInlongGroupId(), identity);
+
+ scheduleBuilder = trigger.getScheduleBuilder();
+ assertInstanceOf(CronScheduleBuilder.class, scheduleBuilder);
+
+ startDate = trigger.getStartTime();
+ assertNotNull(startDate);
+ assertEquals(startDate.getTime(),
scheduleInfo.getStartTime().getTime());
+
+ endDate = trigger.getEndTime();
+ assertNotNull(endDate);
+ assertEquals(endDate.getTime(), scheduleInfo.getEndTime().getTime());
+
+ }
+}
diff --git a/inlong-manager/manager-schedule/src/test/resources/log4j2.xml
b/inlong-manager/manager-schedule/src/test/resources/log4j2.xml
new file mode 100644
index 0000000000..8f2da663e7
--- /dev/null
+++ b/inlong-manager/manager-schedule/src/test/resources/log4j2.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<configuration status="WARN" monitorInterval="30">
+ <Properties>
+ <property name="basePath">logs</property>
+ <property name="log_pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p [%5.30t]
%-30.30C{1.}:%L - %m%n</property>
+ <property name="output_log_level">DEBUG</property>
+ <property
name="all_fileName">${basePath}/manager-service-ut.log</property>
+ <property name="console_print_level">DEBUG</property>
+ </Properties>
+
+ <appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <ThresholdFilter level="${console_print_level}" onMatch="ACCEPT"
onMismatch="DENY"/>
+ <PatternLayout pattern="${log_pattern}"/>
+ <follow>true</follow>
+ </Console>
+ <File name="AllFile" fileName="${all_fileName}">
+ <PatternLayout pattern="${log_pattern}"/>
+ </File>
+ </appenders>
+
+ <loggers>
+ <root level="${output_log_level}">
+ <appender-ref ref="Console"/>
+ <appender-ref ref="AllFile"/>
+ </root>
+ </loggers>
+</configuration>
\ No newline at end of file
diff --git
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 4a9814bdc0..d29fcc9814 100644
---
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -911,7 +911,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id,
undeleted ones cannot be repeated',
`schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT
'Schedule type, 0 for normal, 1 for crontab',
- `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule
unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway',
+ `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule unit,
Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway',
`schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule
interval',
`start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'Start time for schedule',
`end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'End time for schedule',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 4fe739c99e..fc7820fa82 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -961,7 +961,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id,
undeleted ones cannot be repeated',
`schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT
'Schedule type, 0 for normal, 1 for crontab',
- `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule
unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway',
+ `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule unit,
Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway',
`schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule
interval',
`start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'Start time for schedule',
`end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'End time for schedule',
diff --git a/inlong-manager/manager-web/sql/changes-1.13.0.sql
b/inlong-manager/manager-web/sql/changes-1.13.0.sql
index 4734495338..adbcf5d268 100644
--- a/inlong-manager/manager-web/sql/changes-1.13.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.13.0.sql
@@ -35,7 +35,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong
group id, undeleted ones cannot be repeated',
`schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT
'Schedule type, 0 for normal, 1 for crontab',
- `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule
unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway',
+ `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule unit,
Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway',
`schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule
interval',
`start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'Start time for schedule',
`end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'End time for schedule',
diff --git a/inlong-manager/pom.xml b/inlong-manager/pom.xml
index bff220728b..cc7421d1c1 100644
--- a/inlong-manager/pom.xml
+++ b/inlong-manager/pom.xml
@@ -40,8 +40,8 @@
<module>manager-service</module>
<module>manager-workflow</module>
<module>manager-web</module>
- <module>manager-docker</module>
<module>manager-schedule</module>
+ <module>manager-docker</module>
</modules>
<properties>