This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit f5950f92321ee05806092466aa37daa5abfa27a0
Author: AloysZhang <[email protected]>
AuthorDate: Thu Jun 13 22:44:42 2024 +0800

    [INLONG-10396][Manager] Support build-in schedule base on quartz (#10412)
---
 .../inlong/manager/dao/entity/ScheduleEntity.java  |   2 +-
 .../inlong/manager/pojo/schedule/ScheduleInfo.java |   4 +-
 .../manager/pojo/schedule/ScheduleInfoRequest.java |   4 +-
 inlong-manager/manager-schedule/pom.xml            |  31 ++++
 .../org/apache/inlong/schedule/ScheduleType.java   |  45 ++++++
 .../org/apache/inlong/schedule/ScheduleUnit.java   |  48 ++++++
 .../exception/QuartzScheduleException.java         |  32 ++++
 .../schedule/quartz/QuartzOfflineSyncJob.java      |  40 +++++
 .../schedule/quartz/QuartzScheduleClient.java      |  50 +++++++
 .../schedule/quartz/QuartzScheduleEngine.java      | 156 ++++++++++++++++++++
 .../schedule/quartz/QuartzSchedulerListener.java   | 137 +++++++++++++++++
 .../apache/inlong/schedule/util/ScheduleUtils.java | 139 +++++++++++++++++
 .../apache/inlong/schedule/BaseScheduleTest.java   | 116 +++++++++++++++
 .../org/apache/inlong/schedule/quartz/MockJob.java |  48 ++++++
 .../schedule/quartz/QuartzScheduleEngineTest.java  | 161 ++++++++++++++++++++
 .../inlong/schedule/util/ScheduleUtilsTest.java    | 164 +++++++++++++++++++++
 .../manager-schedule/src/test/resources/log4j2.xml |  46 ++++++
 .../main/resources/h2/apache_inlong_manager.sql    |   2 +-
 .../manager-web/sql/apache_inlong_manager.sql      |   2 +-
 inlong-manager/manager-web/sql/changes-1.13.0.sql  |   2 +-
 inlong-manager/pom.xml                             |   2 +-
 21 files changed, 1222 insertions(+), 9 deletions(-)

diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
index 75237343cb..b2d49cd43c 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
@@ -34,7 +34,7 @@ public class ScheduleEntity implements Serializable {
     // schedule type, support [normal, crontab], 0 for normal and 1 for crontab
     private Integer scheduleType;
     // time unit for offline task schedule interval, support [month, week, 
day, hour, minute, oneway]
-    // M=month, W=week, D=day, H=hour, M=minute, O=oneway
+    // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway
     private String scheduleUnit;
     private Integer scheduleInterval;
     // schedule start time, long type timestamp
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
index 13afce70a4..2386d817bb 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
@@ -50,7 +50,7 @@ public class ScheduleInfo {
     private Integer scheduleType;
 
     // time unit for offline task schedule interval, support [month, week, 
day, hour, minute, oneway]
-    // M=month, W=week, D=day, H=hour, M=minute, O=oneway
+    // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway
     @ApiModelProperty("TimeUnit for schedule interval")
     private String scheduleUnit;
 
@@ -73,7 +73,7 @@ public class ScheduleInfo {
     private Integer taskParallelism;
 
     @ApiModelProperty("Schedule task parallelism")
-    private Integer crontabExpression;
+    private String crontabExpression;
 
     @ApiModelProperty(value = "Version number")
     @NotNull(groups = UpdateValidation.class, message = "version cannot be 
null")
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
index eff1660719..b3c117da9a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
@@ -44,7 +44,7 @@ public class ScheduleInfoRequest {
     private Integer scheduleType;
 
     // time unit for offline task schedule interval, support [month, week, 
day, hour, minute, oneway]
-    // M=month, W=week, D=day, H=hour, M=minute, O=oneway
+    // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway
     @ApiModelProperty("TimeUnit for schedule interval")
     private String scheduleUnit;
 
@@ -67,7 +67,7 @@ public class ScheduleInfoRequest {
     private Integer taskParallelism;
 
     @ApiModelProperty("Schedule task parallelism")
-    private Integer crontabExpression;
+    private String crontabExpression;
 
     @ApiModelProperty(value = "Version number")
     @NotNull(groups = UpdateValidation.class, message = "version cannot be 
null")
diff --git a/inlong-manager/manager-schedule/pom.xml 
b/inlong-manager/manager-schedule/pom.xml
index f3a4e0a41f..8d598b47ca 100644
--- a/inlong-manager/manager-schedule/pom.xml
+++ b/inlong-manager/manager-schedule/pom.xml
@@ -27,6 +27,7 @@
     <artifactId>manager-schedule</artifactId>
 
     <properties>
+        <quartz.version>2.3.2</quartz.version>
         <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     </properties>
@@ -37,5 +38,35 @@
             <artifactId>manager-pojo</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.quartz-scheduler</groupId>
+            <artifactId>quartz</artifactId>
+            <version>${quartz.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.mchange</groupId>
+                    <artifactId>c3p0</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.mchange</groupId>
+                    <artifactId>mchange-commons-java</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.zaxxer</groupId>
+                    <artifactId>HikariCP-java7</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <version>${awaitility.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleType.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleType.java
new file mode 100644
index 0000000000..0f296fec42
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleType.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule;
+
+import lombok.Getter;
+
+/**
+ * Type of schedule, including normal and crontab.
+ * */
+@Getter
+public enum ScheduleType {
+
+    NORMAL(0),
+    CRONTAB(1);
+
+    private final int code;
+
+    ScheduleType(int code) {
+        this.code = code;
+    }
+
+    public static ScheduleType fromCode(int code) {
+        for (ScheduleType type : ScheduleType.values()) {
+            if (type.code == code) {
+                return type;
+            }
+        }
+        return null;
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleUnit.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleUnit.java
new file mode 100644
index 0000000000..7b4d60779f
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleUnit.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule;
+
+import lombok.Getter;
+
+@Getter
+public enum ScheduleUnit {
+
+    YEAR("Y"),
+    MONTH("M"),
+    DAY("D"),
+    WEEK("W"),
+    HOUR("H"),
+    MINUTE("I"),
+    SECOND("S"),
+    ONE_WAY("O");
+
+    final String unit;
+
+    ScheduleUnit(String unit) {
+        this.unit = unit;
+    }
+
+    public static ScheduleUnit getScheduleUnit(String unit) {
+        for (ScheduleUnit scheduleUnit : ScheduleUnit.values()) {
+            if (scheduleUnit.unit.equalsIgnoreCase(unit)) {
+                return scheduleUnit;
+            }
+        }
+        return null;
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/exception/QuartzScheduleException.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/exception/QuartzScheduleException.java
new file mode 100644
index 0000000000..2d0ff005bd
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/exception/QuartzScheduleException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule.exception;
+
+/**
+ * Exceptions occur in the schedule procedure.
+ * */
+public class QuartzScheduleException extends RuntimeException {
+
+    public QuartzScheduleException(String message) {
+        super(message);
+    }
+
+    public QuartzScheduleException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzOfflineSyncJob.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzOfflineSyncJob.java
new file mode 100644
index 0000000000..27628b3320
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzOfflineSyncJob.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule.quartz;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.quartz.Job;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class QuartzOfflineSyncJob implements Job {
+
+    private ScheduleInfo scheduleInfo;
+
+    @Override
+    public void execute(JobExecutionContext context) throws 
JobExecutionException {
+        // TODO: complete the offline sync logic
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleClient.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleClient.java
new file mode 100644
index 0000000000..41fc8814a1
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule.quartz;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.schedule.ScheduleEngineClient;
+
+/**
+ * Built-in implementation of schedule engine client corresponding with {@link 
QuartzScheduleEngine}.
+ * QuartzScheduleClient simply invokes the {@link QuartzScheduleEngine} to 
register/unregister/update
+ * schedule info instead of calling a remote schedule service.
+ * */
+public class QuartzScheduleClient implements ScheduleEngineClient {
+
+    private final QuartzScheduleEngine scheduleEngine;
+
+    public QuartzScheduleClient(QuartzScheduleEngine scheduleEngine) {
+        this.scheduleEngine = scheduleEngine;
+    }
+
+    @Override
+    public boolean register(ScheduleInfo scheduleInfo) {
+        return scheduleEngine.handleRegister(scheduleInfo);
+    }
+
+    @Override
+    public boolean unregister(ScheduleInfo scheduleInfo) {
+        return scheduleEngine.handleUnregister(scheduleInfo);
+    }
+
+    @Override
+    public boolean update(ScheduleInfo scheduleInfo) {
+        return scheduleEngine.handleUpdate(scheduleInfo);
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngine.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngine.java
new file mode 100644
index 0000000000..31736f1887
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngine.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule.quartz;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.schedule.ScheduleEngine;
+import org.apache.inlong.schedule.exception.QuartzScheduleException;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.inlong.schedule.util.ScheduleUtils.genQuartzJobDetail;
+import static org.apache.inlong.schedule.util.ScheduleUtils.genQuartzTrigger;
+
+/**
+ * The default implementation of schedule engine based on Quartz scheduler. 
Response for processing
+ * the register/unregister/update requests from {@link QuartzScheduleClient}
+ * */
+@Getter
+public class QuartzScheduleEngine implements ScheduleEngine {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(QuartzScheduleEngine.class);
+
+    private final Scheduler scheduler;
+    private final Set<String> scheduledJobSet = new HashSet<>();
+
+    public QuartzScheduleEngine() {
+        try {
+            this.scheduler = new StdSchedulerFactory().getScheduler();
+            LOGGER.info("Quartz scheduler engine initialized");
+        } catch (SchedulerException e) {
+            throw new QuartzScheduleException("Failed to init quartz scheduler 
", e);
+        }
+    }
+
+    @Override
+    public void start() {
+        try {
+            // add listener
+            scheduler.getListenerManager().addSchedulerListener(new 
QuartzSchedulerListener(this));
+            scheduler.start();
+            LOGGER.info("Quartz scheduler engine started");
+        } catch (SchedulerException e) {
+            throw new QuartzScheduleException("Failed to start quartz 
scheduler ", e);
+        }
+    }
+
+    /**
+     * Clean job info from scheduledJobSet after trigger finalized.
+     * */
+    public boolean triggerFinalized(Trigger trigger) {
+        String jobName = trigger.getJobKey().getName();
+        LOGGER.info("Trigger finalized for job {}", jobName);
+        return scheduledJobSet.remove(jobName);
+    }
+
+    /**
+     * Handle schedule register.
+     * @param scheduleInfo schedule info to register
+     * */
+    @Override
+    public boolean handleRegister(ScheduleInfo scheduleInfo) {
+        return handleRegister(scheduleInfo, QuartzOfflineSyncJob.class);
+    }
+
+    @VisibleForTesting
+    public boolean handleRegister(ScheduleInfo scheduleInfo, Class<? extends 
QuartzOfflineSyncJob> clz) {
+        if (scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) {
+            throw new QuartzScheduleException("Group " + 
scheduleInfo.getInlongGroupId() + " is already registered");
+        }
+        JobDetail jobDetail = genQuartzJobDetail(scheduleInfo, clz);
+        Trigger trigger = genQuartzTrigger(jobDetail, scheduleInfo);
+        try {
+            scheduler.scheduleJob(jobDetail, trigger);
+            scheduledJobSet.add(scheduleInfo.getInlongGroupId());
+            LOGGER.info("Registered new schedule info for {}", 
scheduleInfo.getInlongGroupId());
+        } catch (SchedulerException e) {
+            throw new QuartzScheduleException(e.getMessage());
+        }
+        return false;
+    }
+
+    /**
+     * Handle schedule unregister.
+     * @param scheduleInfo schedule info to unregister
+     * */
+    @Override
+    public boolean handleUnregister(ScheduleInfo scheduleInfo) {
+        if (scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) {
+            try {
+                scheduler.deleteJob(new 
JobKey(scheduleInfo.getInlongGroupId()));
+            } catch (SchedulerException e) {
+                throw new QuartzScheduleException(e.getMessage());
+            }
+        }
+        scheduledJobSet.remove(scheduleInfo.getInlongGroupId());
+        LOGGER.info("Un-registered schedule info for {}", 
scheduleInfo.getInlongGroupId());
+        return true;
+    }
+
+    /**
+     * Handle schedule update.
+     * @param scheduleInfo schedule info to update
+     * */
+    @Override
+    public boolean handleUpdate(ScheduleInfo scheduleInfo) {
+        return handleUpdate(scheduleInfo, QuartzOfflineSyncJob.class);
+    }
+
+    @VisibleForTesting
+    public boolean handleUpdate(ScheduleInfo scheduleInfo, Class<? extends 
QuartzOfflineSyncJob> clz) {
+        handleUnregister(scheduleInfo);
+        handleRegister(scheduleInfo, clz);
+        LOGGER.info("Updated schedule info for {}", 
scheduleInfo.getInlongGroupId());
+        return false;
+    }
+
+    @Override
+    public void stop() {
+        if (scheduler != null) {
+            try {
+                scheduler.shutdown();
+                LOGGER.info("Quartz scheduler engine stopped");
+            } catch (SchedulerException e) {
+                throw new QuartzScheduleException("Failed to stop quartz 
scheduler ", e);
+            }
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzSchedulerListener.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzSchedulerListener.java
new file mode 100644
index 0000000000..ca4f9f1b03
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzSchedulerListener.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule.quartz;
+
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.SchedulerListener;
+import org.quartz.Trigger;
+import org.quartz.TriggerKey;
+
+/**
+ * Default implementation for quartz scheduler listener.
+ * */
+public class QuartzSchedulerListener implements SchedulerListener {
+
+    QuartzScheduleEngine quartzScheduleEngine;
+
+    public QuartzSchedulerListener(QuartzScheduleEngine quartzScheduleEngine) {
+        this.quartzScheduleEngine = quartzScheduleEngine;
+    }
+
+    @Override
+    public void jobScheduled(Trigger trigger) {
+
+    }
+
+    @Override
+    public void jobUnscheduled(TriggerKey triggerKey) {
+
+    }
+
+    @Override
+    public void triggerFinalized(Trigger trigger) {
+        quartzScheduleEngine.triggerFinalized(trigger);
+    }
+
+    @Override
+    public void triggerPaused(TriggerKey triggerKey) {
+
+    }
+
+    @Override
+    public void triggersPaused(String triggerGroup) {
+
+    }
+
+    @Override
+    public void triggerResumed(TriggerKey triggerKey) {
+
+    }
+
+    @Override
+    public void triggersResumed(String triggerGroup) {
+
+    }
+
+    @Override
+    public void jobAdded(JobDetail jobDetail) {
+
+    }
+
+    @Override
+    public void jobDeleted(JobKey jobKey) {
+
+    }
+
+    @Override
+    public void jobPaused(JobKey jobKey) {
+
+    }
+
+    @Override
+    public void jobsPaused(String jobGroup) {
+
+    }
+
+    @Override
+    public void jobResumed(JobKey jobKey) {
+
+    }
+
+    @Override
+    public void jobsResumed(String jobGroup) {
+
+    }
+
+    @Override
+    public void schedulerError(String msg, SchedulerException cause) {
+
+    }
+
+    @Override
+    public void schedulerInStandbyMode() {
+
+    }
+
+    @Override
+    public void schedulerStarted() {
+
+    }
+
+    @Override
+    public void schedulerStarting() {
+
+    }
+
+    @Override
+    public void schedulerShutdown() {
+
+    }
+
+    @Override
+    public void schedulerShuttingdown() {
+
+    }
+
+    @Override
+    public void schedulingDataCleared() {
+
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/util/ScheduleUtils.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/util/ScheduleUtils.java
new file mode 100644
index 0000000000..73114fcb6c
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/util/ScheduleUtils.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule.util;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.schedule.ScheduleType;
+import org.apache.inlong.schedule.ScheduleUnit;
+import org.apache.inlong.schedule.exception.QuartzScheduleException;
+import org.apache.inlong.schedule.quartz.QuartzOfflineSyncJob;
+
+import org.apache.commons.lang3.StringUtils;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.CronTrigger;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.ScheduleBuilder;
+import org.quartz.SimpleScheduleBuilder;
+import org.quartz.SimpleTrigger;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+
+import java.sql.Timestamp;
+import java.util.Date;
+
+/**
+ * Tools for schedule.
+ * */
+public class ScheduleUtils {
+
+    public static JobDetail genQuartzJobDetail(ScheduleInfo scheduleInfo, 
Class<? extends QuartzOfflineSyncJob> clz) {
+        return JobBuilder.newJob(clz)
+                .withIdentity(scheduleInfo.getInlongGroupId())
+                .build();
+    }
+
+    public static Trigger genQuartzTrigger(JobDetail jobDetail, ScheduleInfo 
scheduleInfo) {
+        String key = jobDetail.getKey().getName();
+        Timestamp startTime = scheduleInfo.getStartTime();
+        Timestamp endTime = scheduleInfo.getEndTime();
+        int scheduleType = scheduleInfo.getScheduleType();
+        ScheduleType type = ScheduleType.fromCode(scheduleType);
+        if (type == null) {
+            throw new QuartzScheduleException("Invalid schedule type: " + 
scheduleType);
+        }
+        switch (type) {
+            case NORMAL:
+                return TriggerBuilder.newTrigger()
+                        .withIdentity(key)
+                        .startAt(new Date(startTime.getTime()))
+                        .endAt(new Date(endTime.getTime()))
+                        
.withSchedule(genSimpleQuartzScheduleBuilder(scheduleInfo.getScheduleInterval(),
+                                scheduleInfo.getScheduleUnit()))
+                        .forJob(jobDetail).build();
+            case CRONTAB:
+                return TriggerBuilder.newTrigger()
+                        .withIdentity(key)
+                        .startAt(new Date(startTime.getTime()))
+                        .endAt(new Date(endTime.getTime()))
+                        
.withSchedule(genCronQuartzScheduleBuilder(scheduleInfo.getCrontabExpression()))
+                        .forJob(jobDetail).build();
+            default:
+                throw new QuartzScheduleException("Unknown schedule type: " + 
scheduleType);
+        }
+    }
+
+    // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway
+    public static ScheduleBuilder<SimpleTrigger> 
genSimpleQuartzScheduleBuilder(int interval, String scheduleUnit) {
+        if (StringUtils.isBlank(scheduleUnit)) {
+            throw new QuartzScheduleException("Schedule unit cannot be empty");
+        }
+        ScheduleUnit unit = ScheduleUnit.getScheduleUnit(scheduleUnit);
+        if (unit == null) {
+            throw new QuartzScheduleException("Unknown schedule unit: " + 
scheduleUnit);
+        }
+        switch (unit) {
+            case YEAR:
+                return SimpleScheduleBuilder
+                        .simpleSchedule()
+                        .withIntervalInHours(365 * 24 * interval)
+                        .repeatForever();
+            case MONTH:
+                return SimpleScheduleBuilder
+                        .simpleSchedule()
+                        .withIntervalInHours(30 * 24 * interval)
+                        .repeatForever();
+            case WEEK:
+                return SimpleScheduleBuilder
+                        .simpleSchedule()
+                        .withIntervalInHours(7 * 24 * interval)
+                        .repeatForever();
+            case DAY:
+                return SimpleScheduleBuilder
+                        .simpleSchedule()
+                        .withIntervalInHours(24 * interval)
+                        .repeatForever();
+            case HOUR:
+                return SimpleScheduleBuilder
+                        .simpleSchedule()
+                        .withIntervalInHours(interval)
+                        .repeatForever();
+            case MINUTE:
+                return SimpleScheduleBuilder
+                        .simpleSchedule()
+                        .withIntervalInMinutes(interval)
+                        .repeatForever();
+            case SECOND:
+                return SimpleScheduleBuilder
+                        .simpleSchedule()
+                        .withIntervalInSeconds(interval)
+                        .repeatForever();
+            case ONE_WAY:
+                return SimpleScheduleBuilder
+                        .simpleSchedule()
+                        .withIntervalInSeconds(interval)
+                        .withRepeatCount(1);
+            default:
+                throw new QuartzScheduleException("Not supported schedule 
interval" + scheduleUnit);
+        }
+    }
+
+    public static ScheduleBuilder<CronTrigger> 
genCronQuartzScheduleBuilder(String cronExpression) {
+        return 
CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionIgnoreMisfires();
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/BaseScheduleTest.java
 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/BaseScheduleTest.java
new file mode 100644
index 0000000000..c0ccc4b14c
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/BaseScheduleTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.schedule.exception.QuartzScheduleException;
+
+import java.sql.Timestamp;
+
+import static org.apache.inlong.schedule.ScheduleUnit.SECOND;
+
+public class BaseScheduleTest {
+
+    public static final int SCHEDULE_TYPE_NORMAL = 0;
+    public static final int SCHEDULE_TYPE_CRON = 1;
+    public static final int DEFAULT_INTERVAL = 2;
+    public static final long DEFAULT_SPAN_IN_MS = 10 * 1000;
+    public static final String ILLEGAL_TIMEUNIT = "I";
+    public static final String GROUP_ID = "testGroup";
+    public static final String CRON_EXPRESSION_PER_SECONDS = "*/1 * * * * ?";
+    public static final int CRON_SCHEDULE_INTERVAL_PER_SECONDS = 1;
+    public static final String CRON_EXPRESSION_EVERY_TWO_SECONDS = "*/2 * * * 
* ?";
+    public static final int CRON_SCHEDULE_INTERVAL_EVERY_TWO_SECONDS = 2;
+    public static final String ILLEGAL_CRON_EXPRESSION = "*/1 * * ?";
+
+    public ScheduleInfo genDefaultScheduleInfo() {
+        return genNormalScheduleInfo(GROUP_ID, SECOND.getUnit(), 
DEFAULT_INTERVAL, DEFAULT_SPAN_IN_MS);
+    }
+
+    public ScheduleInfo genNormalScheduleInfo(String groupId, String 
scheduleUnit, int scheduleInterval,
+            long timeSpanInMs) {
+        ScheduleInfo scheduleInfo = new ScheduleInfo();
+        scheduleInfo.setInlongGroupId(groupId);
+        scheduleInfo.setScheduleType(SCHEDULE_TYPE_NORMAL);
+        scheduleInfo.setScheduleUnit(scheduleUnit);
+        scheduleInfo.setScheduleInterval(scheduleInterval);
+        setStartAndEndTime(scheduleInfo, timeSpanInMs);
+        return scheduleInfo;
+    }
+
+    public ScheduleInfo genDefaultCronScheduleInfo() {
+        return genCronScheduleInfo(GROUP_ID, CRON_EXPRESSION_PER_SECONDS, 
DEFAULT_SPAN_IN_MS);
+    }
+
+    public ScheduleInfo genCronScheduleInfo(String groupId, String 
cronExpression, long timeSpanInMs) {
+        ScheduleInfo scheduleInfo = new ScheduleInfo();
+        scheduleInfo.setInlongGroupId(groupId);
+        scheduleInfo.setScheduleType(SCHEDULE_TYPE_CRON);
+        scheduleInfo.setCrontabExpression(cronExpression);
+        setStartAndEndTime(scheduleInfo, timeSpanInMs);
+        return scheduleInfo;
+    }
+
+    private void setStartAndEndTime(ScheduleInfo scheduleInfo, long 
timeSpanInMs) {
+        long startTime = System.currentTimeMillis() / 1000 * 1000;
+        long endTime = startTime + timeSpanInMs;
+        scheduleInfo.setStartTime(new Timestamp(startTime));
+        scheduleInfo.setEndTime(new Timestamp(endTime));
+    }
+
+    protected long calculateScheduleTimes(ScheduleInfo scheduleInfo, boolean 
isCron) {
+
+        long timeSpanInMs = scheduleInfo.getEndTime().getTime() - 
scheduleInfo.getStartTime().getTime();
+        int interval = -1;
+        ScheduleUnit scheduleUnit = null;
+        if (isCron) {
+            if 
(scheduleInfo.getCrontabExpression().equalsIgnoreCase(CRON_EXPRESSION_PER_SECONDS))
 {
+                interval = CRON_SCHEDULE_INTERVAL_PER_SECONDS;
+            } else if 
(scheduleInfo.getCrontabExpression().equalsIgnoreCase(CRON_EXPRESSION_EVERY_TWO_SECONDS))
 {
+                interval = CRON_SCHEDULE_INTERVAL_EVERY_TWO_SECONDS;
+            }
+            scheduleUnit = SECOND;
+        } else {
+            interval = scheduleInfo.getScheduleInterval();
+            scheduleUnit = 
ScheduleUnit.getScheduleUnit(scheduleInfo.getScheduleUnit());
+        }
+        if (scheduleUnit == null) {
+            throw new QuartzScheduleException("Schedule unit is null");
+        }
+        switch (scheduleUnit) {
+            case YEAR:
+                return timeSpanInMs / 365 / 1000 / 3600 / 24 / 7 / interval;
+            case MONTH:
+                return timeSpanInMs / 30 / 1000 / 3600 / 24 / 7 / interval;
+            case WEEK:
+                return timeSpanInMs / 1000 / 3600 / 24 / 7 / interval;
+            case DAY:
+                return timeSpanInMs / 1000 / 3600 / 24 / interval;
+            case HOUR:
+                return timeSpanInMs / 1000 / 3600 / interval;
+            case MINUTE:
+                return timeSpanInMs / 1000 / 60 / interval;
+            case SECOND:
+                return timeSpanInMs / 1000 / interval;
+            case ONE_WAY:
+                return 1;
+            default:
+                return 0;
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/MockJob.java
 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/MockJob.java
new file mode 100644
index 0000000000..47ce19ad09
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/MockJob.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule.quartz;
+
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MockJob extends QuartzOfflineSyncJob {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MockJob.class);
+
+    public static CountDownLatch countDownLatch;
+    private static AtomicInteger counter = new AtomicInteger(0);
+
+    @Override
+    public void execute(JobExecutionContext context) throws 
JobExecutionException {
+        if (countDownLatch.getCount() > 0) {
+            countDownLatch.countDown();
+        }
+        LOGGER.info("MockJob executed " + counter.incrementAndGet());
+    }
+
+    public static void setCount(int count) {
+        countDownLatch = new CountDownLatch(count);
+        counter.set(0);
+        LOGGER.info("MockJob has been reset.");
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngineTest.java
 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngineTest.java
new file mode 100644
index 0000000000..fc5ca9d9fd
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngineTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule.quartz;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.schedule.BaseScheduleTest;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.quartz.JobKey;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.inlong.schedule.ScheduleUnit.SECOND;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class QuartzScheduleEngineTest extends BaseScheduleTest {
+
+    private static QuartzScheduleEngine scheduleEngine;
+
+    @BeforeAll
+    public static void initScheduleEngine() throws Exception {
+        scheduleEngine = new QuartzScheduleEngine();
+        scheduleEngine.start();
+    }
+
+    @Test
+    @Timeout(30)
+    public void testRegisterScheduleInfo() throws Exception {
+        // 1. test for normal schedule
+        ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
+        testRegister(scheduleInfo, false);
+
+        // 2. test for cron schedule
+        scheduleInfo = genDefaultCronScheduleInfo();
+        testRegister(scheduleInfo, true);
+    }
+
+    private void testRegister(ScheduleInfo scheduleInfo, boolean isCrontab) 
throws Exception {
+        // cal total schedule times
+        long expectCount = calculateScheduleTimes(scheduleInfo, isCrontab);
+        // set countdown latch
+        MockJob.setCount((int) expectCount);
+        // register schedule info
+        scheduleEngine.handleRegister(scheduleInfo, MockJob.class);
+        // check job exist
+        assertEquals(1, scheduleEngine.getScheduledJobSet().size());
+        JobKey jobKey = new JobKey(scheduleInfo.getInlongGroupId());
+        boolean exist = scheduleEngine.getScheduler().checkExists(jobKey);
+        assertTrue(exist);
+        MockJob.countDownLatch.await();
+
+        // not job exist after scheduled
+        await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals(0, scheduleEngine.getScheduledJobSet().size());
+            assertFalse(scheduleEngine.getScheduler().checkExists(jobKey));
+        });
+    }
+
+    @Test
+    @Timeout(30)
+    public void testUnRegisterScheduleInfo() throws Exception {
+        // 1. test for normal schedule
+        ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
+        testUnRegister(scheduleInfo, false);
+
+        // 2. test for cron schedule, gen cron schedule info, */2 * * * * ?
+        scheduleInfo = genDefaultCronScheduleInfo();
+        testUnRegister(scheduleInfo, true);
+    }
+
+    private void testUnRegister(ScheduleInfo scheduleInfo, boolean isCrontab) 
throws Exception {
+        // cal total schedule times
+        long expectCount = calculateScheduleTimes(scheduleInfo, isCrontab);
+
+        MockJob.setCount((int) (expectCount / 2));
+        // register schedule info
+        scheduleEngine.handleRegister(scheduleInfo, MockJob.class);
+        // check job exist
+        assertEquals(1, scheduleEngine.getScheduledJobSet().size());
+        JobKey jobKey = new JobKey(scheduleInfo.getInlongGroupId());
+        boolean exist = scheduleEngine.getScheduler().checkExists(jobKey);
+        assertTrue(exist);
+        MockJob.countDownLatch.await();
+
+        // un-register before trigger finalized
+        scheduleEngine.handleUnregister(scheduleInfo);
+        // not job exist after un-register
+        assertEquals(0, scheduleEngine.getScheduledJobSet().size());
+        exist = scheduleEngine.getScheduler().checkExists(jobKey);
+        assertFalse(exist);
+    }
+
+    @Test
+    @Timeout(50)
+    public void testUpdateScheduleInfo() throws Exception {
+        // 1. test for normal schedule
+        ScheduleInfo scheduleInfo =
+                genNormalScheduleInfo(GROUP_ID, SECOND.getUnit(), 
DEFAULT_INTERVAL, DEFAULT_SPAN_IN_MS);
+        ScheduleInfo scheduleInfoToUpdate =
+                genNormalScheduleInfo(GROUP_ID, SECOND.getUnit(), 
DEFAULT_INTERVAL / 2, DEFAULT_SPAN_IN_MS);
+        testUpdate(scheduleInfo, scheduleInfoToUpdate, false);
+
+        // 2. test for cron schedule
+        scheduleInfo = genCronScheduleInfo(GROUP_ID, 
CRON_EXPRESSION_EVERY_TWO_SECONDS, DEFAULT_SPAN_IN_MS);
+        scheduleInfoToUpdate = genCronScheduleInfo(GROUP_ID, 
CRON_EXPRESSION_PER_SECONDS, DEFAULT_SPAN_IN_MS);
+        testUpdate(scheduleInfo, scheduleInfoToUpdate, true);
+    }
+
+    public void testUpdate(ScheduleInfo scheduleInfo, ScheduleInfo 
scheduleInfoToUpdate, boolean isCrontab)
+            throws Exception {
+        // cal total schedule times
+        long expectCount = calculateScheduleTimes(scheduleInfo, isCrontab);
+        MockJob.setCount((int) (expectCount / 2));
+        // register schedule info
+        scheduleEngine.handleRegister(scheduleInfo, MockJob.class);
+        // check job exist
+        assertEquals(1, scheduleEngine.getScheduledJobSet().size());
+        JobKey jobKey = new JobKey(scheduleInfo.getInlongGroupId());
+        boolean exist = scheduleEngine.getScheduler().checkExists(jobKey);
+        assertTrue(exist);
+        MockJob.countDownLatch.await();
+
+        // update schedule before trigger finalized
+        expectCount = calculateScheduleTimes(scheduleInfoToUpdate, isCrontab);
+        MockJob.setCount((int) expectCount);
+        scheduleEngine.handleUpdate(scheduleInfoToUpdate, MockJob.class);
+
+        // job scheduled after updated
+        assertEquals(1, scheduleEngine.getScheduledJobSet().size());
+        exist = scheduleEngine.getScheduler().checkExists(jobKey);
+        assertTrue(exist);
+
+        MockJob.countDownLatch.await();
+
+        // not job exist after scheduled
+        await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals(0, scheduleEngine.getScheduledJobSet().size());
+            assertFalse(scheduleEngine.getScheduler().checkExists(jobKey));
+        });
+    }
+}
diff --git 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/util/ScheduleUtilsTest.java
 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/util/ScheduleUtilsTest.java
new file mode 100644
index 0000000000..77b84f3064
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/util/ScheduleUtilsTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.schedule.util;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.schedule.BaseScheduleTest;
+import org.apache.inlong.schedule.exception.QuartzScheduleException;
+import org.apache.inlong.schedule.quartz.QuartzOfflineSyncJob;
+
+import org.junit.jupiter.api.Test;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.CronTrigger;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.ScheduleBuilder;
+import org.quartz.SimpleScheduleBuilder;
+import org.quartz.SimpleTrigger;
+import org.quartz.Trigger;
+import org.quartz.TriggerKey;
+
+import java.util.Date;
+
+import static org.apache.inlong.schedule.ScheduleUnit.DAY;
+import static org.apache.inlong.schedule.ScheduleUnit.HOUR;
+import static org.apache.inlong.schedule.ScheduleUnit.MINUTE;
+import static org.apache.inlong.schedule.ScheduleUnit.MONTH;
+import static org.apache.inlong.schedule.ScheduleUnit.ONE_WAY;
+import static org.apache.inlong.schedule.ScheduleUnit.WEEK;
+import static org.apache.inlong.schedule.ScheduleUnit.YEAR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ScheduleUtilsTest extends BaseScheduleTest {
+
+    @Test
+    public void testGenScheduleBuilder() {
+        ScheduleBuilder<SimpleTrigger> builder =
+                ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, 
YEAR.getUnit());
+        assertNotNull(builder);
+        assertInstanceOf(SimpleScheduleBuilder.class, builder);
+
+        builder = 
ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, MONTH.getUnit());
+        assertNotNull(builder);
+        assertInstanceOf(SimpleScheduleBuilder.class, builder);
+
+        builder = 
ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, WEEK.getUnit());
+        assertNotNull(builder);
+        assertInstanceOf(SimpleScheduleBuilder.class, builder);
+
+        builder = 
ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, DAY.getUnit());
+        assertNotNull(builder);
+        assertInstanceOf(SimpleScheduleBuilder.class, builder);
+
+        builder = 
ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, HOUR.getUnit());
+        assertNotNull(builder);
+        assertInstanceOf(SimpleScheduleBuilder.class, builder);
+
+        builder = 
ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, 
MINUTE.getUnit());
+        assertNotNull(builder);
+        assertInstanceOf(SimpleScheduleBuilder.class, builder);
+
+        builder = 
ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, 
ONE_WAY.getUnit());
+        assertNotNull(builder);
+        assertInstanceOf(SimpleScheduleBuilder.class, builder);
+
+        try {
+            ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, 
ILLEGAL_TIMEUNIT);
+        } catch (Exception e) {
+            assertInstanceOf(QuartzScheduleException.class, e);
+        }
+
+        ScheduleBuilder<CronTrigger> cronBuilder = 
ScheduleUtils.genCronQuartzScheduleBuilder(
+                CRON_EXPRESSION_EVERY_TWO_SECONDS);
+        assertNotNull(cronBuilder);
+        assertInstanceOf(CronScheduleBuilder.class, cronBuilder);
+
+        try {
+            
ScheduleUtils.genCronQuartzScheduleBuilder(ILLEGAL_CRON_EXPRESSION);
+        } catch (Exception e) {
+            String errorMsg = e.getMessage();
+            assertTrue(errorMsg.contains(ILLEGAL_CRON_EXPRESSION));
+        }
+    }
+
+    @Test
+    public void testGenJobDetail() {
+        ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
+        JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, 
QuartzOfflineSyncJob.class);
+        assertNotNull(jobDetail);
+
+        JobKey jobKey = jobDetail.getKey();
+        assertNotNull(jobKey);
+
+        String identity = jobKey.getName();
+        assertEquals(scheduleInfo.getInlongGroupId(), identity);
+    }
+
+    @Test
+    public void testGenCronTrigger() {
+        // normal
+        ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
+        JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, 
QuartzOfflineSyncJob.class);
+
+        Trigger trigger = ScheduleUtils.genQuartzTrigger(jobDetail, 
scheduleInfo);
+        assertNotNull(trigger);
+
+        TriggerKey triggerKey = trigger.getKey();
+        assertNotNull(triggerKey);
+        String identity = triggerKey.getName();
+        assertEquals(scheduleInfo.getInlongGroupId(), identity);
+
+        ScheduleBuilder<? extends Trigger> scheduleBuilder = 
trigger.getScheduleBuilder();
+        assertInstanceOf(SimpleScheduleBuilder.class, scheduleBuilder);
+
+        Date startDate = trigger.getStartTime();
+        assertNotNull(startDate);
+        assertEquals(startDate.getTime(), 
scheduleInfo.getStartTime().getTime());
+
+        Date endDate = trigger.getEndTime();
+        assertNotNull(endDate);
+        assertEquals(endDate.getTime(), scheduleInfo.getEndTime().getTime());
+
+        // cron
+        scheduleInfo = genDefaultCronScheduleInfo();
+        jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, 
QuartzOfflineSyncJob.class);
+
+        trigger = ScheduleUtils.genQuartzTrigger(jobDetail, scheduleInfo);
+        assertNotNull(trigger);
+
+        triggerKey = trigger.getKey();
+        assertNotNull(triggerKey);
+        identity = triggerKey.getName();
+        assertEquals(scheduleInfo.getInlongGroupId(), identity);
+
+        scheduleBuilder = trigger.getScheduleBuilder();
+        assertInstanceOf(CronScheduleBuilder.class, scheduleBuilder);
+
+        startDate = trigger.getStartTime();
+        assertNotNull(startDate);
+        assertEquals(startDate.getTime(), 
scheduleInfo.getStartTime().getTime());
+
+        endDate = trigger.getEndTime();
+        assertNotNull(endDate);
+        assertEquals(endDate.getTime(), scheduleInfo.getEndTime().getTime());
+
+    }
+}
diff --git a/inlong-manager/manager-schedule/src/test/resources/log4j2.xml 
b/inlong-manager/manager-schedule/src/test/resources/log4j2.xml
new file mode 100644
index 0000000000..8f2da663e7
--- /dev/null
+++ b/inlong-manager/manager-schedule/src/test/resources/log4j2.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<configuration status="WARN" monitorInterval="30">
+    <Properties>
+        <property name="basePath">logs</property>
+        <property name="log_pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p [%5.30t] 
%-30.30C{1.}:%L - %m%n</property>
+        <property name="output_log_level">DEBUG</property>
+        <property 
name="all_fileName">${basePath}/manager-service-ut.log</property>
+        <property name="console_print_level">DEBUG</property>
+    </Properties>
+
+    <appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <ThresholdFilter level="${console_print_level}" onMatch="ACCEPT" 
onMismatch="DENY"/>
+            <PatternLayout pattern="${log_pattern}"/>
+            <follow>true</follow>
+        </Console>
+        <File name="AllFile" fileName="${all_fileName}">
+            <PatternLayout pattern="${log_pattern}"/>
+        </File>
+    </appenders>
+
+    <loggers>
+        <root level="${output_log_level}">
+            <appender-ref ref="Console"/>
+            <appender-ref ref="AllFile"/>
+        </root>
+    </loggers>
+</configuration>
\ No newline at end of file
diff --git 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index a8b545f7bf..babe5139f8 100644
--- 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -970,7 +970,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
     `id`                     int(11)      NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
     `inlong_group_id`        varchar(256) NOT NULL COMMENT 'Inlong group id, 
undeleted ones cannot be repeated',
     `schedule_type`          int(4)       NOT NULL DEFAULT '0' COMMENT 
'Schedule type, 0 for normal, 1 for crontab',
-    `schedule_unit`          varchar(64)  NOT NULL COMMENT 'Schedule 
unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway',
+    `schedule_unit`          varchar(64)  NOT NULL COMMENT 'Schedule unit, 
Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway',
     `schedule_interval`      int(11)      DEFAULT '1' COMMENT 'Schedule 
interval',
     `start_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'Start time for schedule',
     `end_time`               timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'End time for schedule',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql 
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index f7dc1c0a90..b4458af76f 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -1022,7 +1022,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
     `id`                     int(11)      NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
     `inlong_group_id`        varchar(256) NOT NULL COMMENT 'Inlong group id, 
undeleted ones cannot be repeated',
     `schedule_type`          int(4)       NOT NULL DEFAULT '0' COMMENT 
'Schedule type, 0 for normal, 1 for crontab',
-    `schedule_unit`          varchar(64)  NOT NULL COMMENT 'Schedule 
unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway',
+    `schedule_unit`          varchar(64)  NOT NULL COMMENT 'Schedule unit, 
Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway',
     `schedule_interval`      int(11)      DEFAULT '1' COMMENT 'Schedule 
interval',
     `start_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'Start time for schedule',
     `end_time`               timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'End time for schedule',
diff --git a/inlong-manager/manager-web/sql/changes-1.13.0.sql 
b/inlong-manager/manager-web/sql/changes-1.13.0.sql
index 38e7627c5e..6a248ef0a2 100644
--- a/inlong-manager/manager-web/sql/changes-1.13.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.13.0.sql
@@ -97,7 +97,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
     `id`                     int(11)      NOT NULL AUTO_INCREMENT COMMENT 
'Incremental primary key',
     `inlong_group_id`               varchar(256) NOT NULL COMMENT 'Inlong 
group id, undeleted ones cannot be repeated',
     `schedule_type`          int(4)       NOT NULL DEFAULT '0' COMMENT 
'Schedule type, 0 for normal, 1 for crontab',
-    `schedule_unit`          varchar(64)  NOT NULL COMMENT 'Schedule 
unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway',
+    `schedule_unit`          varchar(64)  NOT NULL COMMENT 'Schedule unit, 
Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway',
     `schedule_interval`      int(11)      DEFAULT '1' COMMENT 'Schedule 
interval',
     `start_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'Start time for schedule',
     `end_time`               timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP 
COMMENT 'End time for schedule',
diff --git a/inlong-manager/pom.xml b/inlong-manager/pom.xml
index bff220728b..cc7421d1c1 100644
--- a/inlong-manager/pom.xml
+++ b/inlong-manager/pom.xml
@@ -40,8 +40,8 @@
         <module>manager-service</module>
         <module>manager-workflow</module>
         <module>manager-web</module>
-        <module>manager-docker</module>
         <module>manager-schedule</module>
+        <module>manager-docker</module>
     </modules>
 
     <properties>

Reply via email to