This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit e3beb8c8456e9ad5f9c6d4918c0ecd6d18623d3d
Author: AloysZhang <[email protected]>
AuthorDate: Wed Jun 19 15:41:34 2024 +0800

    [INLONG-10360][Manager] Combine schedule state transition with group 
operations (#10445)
    
    Co-authored-by: fuweng11 <[email protected]>
---
 .../inlong/manager/common/enums/ErrorCodeEnum.java |   2 +
 .../manager/common/enums/ScheduleStatus.java       |  16 +-
 .../inlong/manager/pojo/group/InlongGroupInfo.java |  31 ++++
 .../manager/pojo/group/InlongGroupRequest.java     |  31 ++++
 .../inlong/manager/pojo/schedule/ScheduleInfo.java |  29 ++++
 .../manager/pojo/schedule/ScheduleInfoRequest.java |  28 ++++
 ...leEngineClient.java => NoopScheduleClient.java} |  45 +++---
 .../manager/schedule/ScheduleClientFactory.java    |  55 +++++++
 .../inlong/manager/schedule/ScheduleEngine.java    |   4 +-
 .../manager/schedule/ScheduleEngineClient.java     |  12 +-
 .../manager/schedule/ScheduleEngineType.java}      |  28 +---
 .../schedule/quartz/QuartzScheduleClient.java      |  17 +-
 .../schedule/quartz/QuartzScheduleEngine.java      |  21 +--
 .../manager/schedule/util/ScheduleUtils.java       |   4 +-
 .../inlong/manager/schedule/quartz/MockJob.java    |   3 +-
 .../schedule/quartz/QuartzScheduleEngineTest.java  |   2 +-
 .../manager/schedule/util/ScheduleUtilsTest.java   |   8 +-
 inlong-manager/manager-service/pom.xml             |   5 +
 .../service/group/InlongGroupServiceImpl.java      |  57 ++++++-
 .../schedule/GroupScheduleResourceListener.java    |   8 +-
 .../manager/service/schedule/ScheduleOperator.java |  95 +++++++++++
 .../service/schedule/ScheduleOperatorImpl.java     | 175 +++++++++++++++++++++
 .../manager/service/schedule/ScheduleService.java  |  11 ++
 .../service/schedule/ScheduleServiceImpl.java      |  54 ++++++-
 .../web/controller/InLongSchedulerController.java  |  24 ++-
 .../src/main/resources/application-dev.properties  |   6 +-
 .../src/main/resources/application-prod.properties |   3 +
 .../src/main/resources/application-test.properties |   3 +
 .../src/main/resources/application.properties      |   4 +
 29 files changed, 698 insertions(+), 83 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index 6a8f9b4699..2a82325ad9 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -127,6 +127,8 @@ public enum ErrorCodeEnum {
 
     SCHEDULE_NOT_FOUND(1700, "Schedule info not found"),
     SCHEDULE_DUPLICATE(1701, "Schedule info already exist"),
+    SCHEDULE_ENGINE_NOT_SUPPORTED(1702, "Schedule engine type not supported"),
+    SCHEDULE_STATUS_TRANSITION_NOT_ALLOWED(1703, "Schedule status transition 
is not allowed"),
 
     WORKFLOW_EXE_FAILED(4000, "Workflow execution exception"),
     WORKFLOW_APPROVER_NOT_FOUND(4001, "Workflow approver does not exist/no 
operation authority"),
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java
index cb256a491c..2d936b5532 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java
@@ -19,11 +19,25 @@ package org.apache.inlong.manager.common.enums;
 
 import lombok.Getter;
 
+/**
+ * Status for schedule info.
+ * This is the transient status of the schedule info.
+ * With specified operations, the status will change to corresponding value.
+ *  Status                Operations
+ *  NEW                   inlong group created with schedule info
+ *  APPROVED              the new inlong group approved by admin
+ *  REGISTERED            schedule info registered to schedule engine
+ *  UPDATED               update schedule info for a group
+ *  DELETED               delete a group
+ * */
 @Getter
 public enum ScheduleStatus {
 
     NEW(100, "new"),
-    DELETED(40, "deleted");
+    APPROVED(101, "approved"),
+    REGISTERED(102, "registered"),
+    UPDATED(103, "updated"),
+    DELETED(99, "deleted");
 
     private final Integer code;
     private final String description;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
index 17b66497a9..0fb0479196 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
@@ -30,6 +30,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 
+import java.sql.Timestamp;
 import java.util.Date;
 import java.util.List;
 
@@ -137,6 +138,36 @@ public abstract class InlongGroupInfo extends 
BaseInlongGroup {
     @ApiModelProperty(value = "Inlong tenant")
     private String tenant;
 
+    // schedule type, support [normal, crontab], 0 for normal and 1 for crontab
+    @ApiModelProperty("Schedule type")
+    private Integer scheduleType;
+
+    // time unit for offline task schedule interval, support [month, week, 
day, hour, minute, oneway]
+    // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway
+    @ApiModelProperty("TimeUnit for schedule interval")
+    private String scheduleUnit;
+
+    @ApiModelProperty("Schedule interval")
+    private Integer scheduleInterval;
+
+    @ApiModelProperty("Start time")
+    private Timestamp startTime;
+
+    @ApiModelProperty("End time")
+    private Timestamp endTime;
+
+    @ApiModelProperty("Delay time")
+    private Integer delayTime;
+
+    @ApiModelProperty("Self depend")
+    private Integer selfDepend;
+
+    @ApiModelProperty("Schedule task parallelism")
+    private Integer taskParallelism;
+
+    @ApiModelProperty("Schedule task parallelism")
+    private Integer crontabExpression;
+
     public abstract InlongGroupRequest genRequest();
 
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
index 6140bddad5..1adc210a0a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
@@ -34,6 +34,7 @@ import javax.validation.constraints.NotBlank;
 import javax.validation.constraints.NotNull;
 import javax.validation.constraints.Pattern;
 
+import java.sql.Timestamp;
 import java.util.List;
 
 /**
@@ -130,4 +131,34 @@ public abstract class InlongGroupRequest extends 
BaseInlongGroup {
     @NotNull(groups = UpdateValidation.class, message = "version cannot be 
null")
     private Integer version;
 
+    // schedule type, support [normal, crontab], 0 for normal and 1 for crontab
+    @ApiModelProperty("Schedule type")
+    private Integer scheduleType;
+
+    // time unit for offline task schedule interval, support [month, week, 
day, hour, minute, oneway]
+    // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway
+    @ApiModelProperty("TimeUnit for schedule interval")
+    private String scheduleUnit;
+
+    @ApiModelProperty("Schedule interval")
+    private Integer scheduleInterval;
+
+    @ApiModelProperty("Start time")
+    private Timestamp startTime;
+
+    @ApiModelProperty("End time")
+    private Timestamp endTime;
+
+    @ApiModelProperty("Delay time")
+    private Integer delayTime;
+
+    @ApiModelProperty("Self depend")
+    private Integer selfDepend;
+
+    @ApiModelProperty("Schedule task parallelism")
+    private Integer taskParallelism;
+
+    @ApiModelProperty("Schedule task parallelism")
+    private Integer crontabExpression;
+
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
index 2386d817bb..24f2e6196e 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
@@ -29,6 +29,7 @@ import lombok.NoArgsConstructor;
 import javax.validation.constraints.NotNull;
 
 import java.sql.Timestamp;
+import java.util.Objects;
 
 @Data
 @Builder
@@ -79,4 +80,32 @@ public class ScheduleInfo {
     @NotNull(groups = UpdateValidation.class, message = "version cannot be 
null")
     private Integer version;
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ScheduleInfo that = (ScheduleInfo) o;
+        return Objects.equals(id, that.id) && Objects.equals(inlongGroupId, 
that.inlongGroupId)
+                && Objects.equals(scheduleType, that.scheduleType) && 
Objects.equals(scheduleUnit,
+                        that.scheduleUnit)
+                && Objects.equals(scheduleInterval, that.scheduleInterval)
+                && Objects.equals(startTime, that.startTime) && 
Objects.equals(endTime, that.endTime)
+                && Objects.equals(delayTime, that.delayTime) && 
Objects.equals(selfDepend,
+                        that.selfDepend)
+                && Objects.equals(taskParallelism, that.taskParallelism)
+                && Objects.equals(crontabExpression, that.crontabExpression) 
&& Objects.equals(version,
+                        that.version);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, inlongGroupId, scheduleType, scheduleUnit, 
scheduleInterval, startTime, endTime,
+                delayTime,
+                selfDepend, taskParallelism, crontabExpression, version);
+    }
+
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
index b3c117da9a..a324cee7f5 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
@@ -26,6 +26,7 @@ import lombok.Data;
 import javax.validation.constraints.NotNull;
 
 import java.sql.Timestamp;
+import java.util.Objects;
 
 @Data
 @ApiModel("Schedule request")
@@ -73,4 +74,31 @@ public class ScheduleInfoRequest {
     @NotNull(groups = UpdateValidation.class, message = "version cannot be 
null")
     private Integer version;
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ScheduleInfoRequest that = (ScheduleInfoRequest) o;
+        return Objects.equals(id, that.id) && Objects.equals(inlongGroupId, 
that.inlongGroupId)
+                && Objects.equals(scheduleType, that.scheduleType) && 
Objects.equals(scheduleUnit,
+                        that.scheduleUnit)
+                && Objects.equals(scheduleInterval, that.scheduleInterval)
+                && Objects.equals(startTime, that.startTime) && 
Objects.equals(endTime, that.endTime)
+                && Objects.equals(delayTime, that.delayTime) && 
Objects.equals(selfDepend,
+                        that.selfDepend)
+                && Objects.equals(taskParallelism, that.taskParallelism)
+                && Objects.equals(crontabExpression, that.crontabExpression) 
&& Objects.equals(version,
+                        that.version);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, inlongGroupId, scheduleType, scheduleUnit, 
scheduleInterval, startTime, endTime,
+                delayTime,
+                selfDepend, taskParallelism, crontabExpression, version);
+    }
 }
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
similarity index 58%
copy from 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java
copy to 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
index dee5cbb2db..a122235de3 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
@@ -19,27 +19,28 @@ package org.apache.inlong.manager.schedule;
 
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
 
-/**
- * Interface for schedule engine client which responses for communicating with 
schedule engine.
- * */
-public interface ScheduleEngineClient {
-
-    /**
-     * Register schedule to schedule engine.
-     * @param scheduleInfo schedule info to register
-     * */
-    boolean register(ScheduleInfo scheduleInfo);
-
-    /**
-     * Un-register schedule from schedule engine.
-     * @param scheduleInfo schedule info to unregister
-     * */
-    boolean unregister(ScheduleInfo scheduleInfo);
-
-    /**
-     * Update schedule from schedule engine.
-     * @param scheduleInfo schedule info to update
-     * */
-    boolean update(ScheduleInfo scheduleInfo);
+import org.springframework.stereotype.Service;
 
+@Service
+public class NoopScheduleClient implements ScheduleEngineClient {
+
+    @Override
+    public boolean accept(String engineType) {
+        return ScheduleEngineType.NONE.getType().equals(engineType);
+    }
+
+    @Override
+    public boolean register(ScheduleInfo scheduleInfo) {
+        return true;
+    }
+
+    @Override
+    public boolean unregister(String groupId) {
+        return true;
+    }
+
+    @Override
+    public boolean update(ScheduleInfo scheduleInfo) {
+        return true;
+    }
 }
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleClientFactory.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleClientFactory.java
new file mode 100644
index 0000000000..13f87b3c45
--- /dev/null
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleClientFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Optional;
+
+@Service
+public class ScheduleClientFactory {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ScheduleClientFactory.class);
+
+    @Value("${inlong.schedule.engine:none}")
+    private String scheduleEngineName;
+
+    @Autowired
+    List<ScheduleEngineClient> scheduleEngineClients;
+
+    public ScheduleEngineClient getInstance() {
+        Optional<ScheduleEngineClient> optScheduleClient =
+                scheduleEngineClients.stream().filter(t -> 
t.accept(scheduleEngineName)).findFirst();
+        if (!optScheduleClient.isPresent()) {
+            LOGGER.warn("Schedule engine client not found for {} ", 
scheduleEngineName);
+            throw new 
BusinessException(ErrorCodeEnum.SCHEDULE_ENGINE_NOT_SUPPORTED,
+                    
String.format(ErrorCodeEnum.SCHEDULE_ENGINE_NOT_SUPPORTED.getMessage(), 
scheduleEngineName));
+        }
+        LOGGER.info("Get schedule engine client success for {}", 
scheduleEngineName);
+        return optScheduleClient.get();
+    }
+
+}
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngine.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngine.java
index 1f52e280de..bc0c963b38 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngine.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngine.java
@@ -37,9 +37,9 @@ public interface ScheduleEngine {
 
     /**
      * Handle schedule unregister.
-     * @param scheduleInfo schedule info to unregister
+     * @param groupId group to un-register schedule info
      * */
-    boolean handleUnregister(ScheduleInfo scheduleInfo);
+    boolean handleUnregister(String groupId);
 
     /**
      * Handle schedule update.
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java
index dee5cbb2db..9c6cf081d4 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java
@@ -24,6 +24,11 @@ import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
  * */
 public interface ScheduleEngineClient {
 
+    /**
+     * Check whether scheduleEngine type is matched.
+     * */
+    boolean accept(String engineType);
+
     /**
      * Register schedule to schedule engine.
      * @param scheduleInfo schedule info to register
@@ -32,9 +37,10 @@ public interface ScheduleEngineClient {
 
     /**
      * Un-register schedule from schedule engine.
-     * @param scheduleInfo schedule info to unregister
-     * */
-    boolean unregister(ScheduleInfo scheduleInfo);
+     *
+     * @param groupId schedule info to unregister
+     */
+    boolean unregister(String groupId);
 
     /**
      * Update schedule from schedule engine.
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
similarity index 57%
copy from 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java
copy to 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
index cb256a491c..71949ef744 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
@@ -15,31 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.enums;
+package org.apache.inlong.manager.schedule;
 
 import lombok.Getter;
 
 @Getter
-public enum ScheduleStatus {
+public enum ScheduleEngineType {
 
-    NEW(100, "new"),
-    DELETED(40, "deleted");
+    NONE("None"),
+    QUARTZ("Quartz");
 
-    private final Integer code;
-    private final String description;
+    private final String type;
 
-    ScheduleStatus(Integer code, String description) {
-        this.code = code;
-        this.description = description;
+    ScheduleEngineType(String type) {
+        this.type = type;
     }
-
-    public static ScheduleStatus forCode(int code) {
-        for (ScheduleStatus status : values()) {
-            if (status.getCode() == code) {
-                return status;
-            }
-        }
-        throw new IllegalStateException(String.format("Illegal code=%s for 
ScheduleStatus", code));
-    }
-
-}
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleClient.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleClient.java
index 05aa6c01ae..6b7afe784f 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleClient.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleClient.java
@@ -19,18 +19,25 @@ package org.apache.inlong.manager.schedule.quartz;
 
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
 import org.apache.inlong.manager.schedule.ScheduleEngineClient;
+import org.apache.inlong.manager.schedule.ScheduleEngineType;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 /**
  * Built-in implementation of schedule engine client corresponding with {@link 
QuartzScheduleEngine}.
  * QuartzScheduleClient simply invokes the {@link QuartzScheduleEngine} to 
register/unregister/update
  * schedule info instead of calling a remote schedule service.
  * */
+@Service
 public class QuartzScheduleClient implements ScheduleEngineClient {
 
-    private final QuartzScheduleEngine scheduleEngine;
+    @Autowired
+    public QuartzScheduleEngine scheduleEngine;
 
-    public QuartzScheduleClient(QuartzScheduleEngine scheduleEngine) {
-        this.scheduleEngine = scheduleEngine;
+    @Override
+    public boolean accept(String engineType) {
+        return 
ScheduleEngineType.QUARTZ.getType().equalsIgnoreCase(engineType);
     }
 
     @Override
@@ -39,8 +46,8 @@ public class QuartzScheduleClient implements 
ScheduleEngineClient {
     }
 
     @Override
-    public boolean unregister(ScheduleInfo scheduleInfo) {
-        return scheduleEngine.handleUnregister(scheduleInfo);
+    public boolean unregister(String groupId) {
+        return scheduleEngine.handleUnregister(groupId);
     }
 
     @Override
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java
index d9d2620211..e8bb1085ce 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java
@@ -23,6 +23,7 @@ import 
org.apache.inlong.manager.schedule.exception.QuartzScheduleException;
 
 import com.google.common.annotations.VisibleForTesting;
 import lombok.Getter;
+import org.quartz.Job;
 import org.quartz.JobDetail;
 import org.quartz.JobKey;
 import org.quartz.Scheduler;
@@ -31,6 +32,7 @@ import org.quartz.Trigger;
 import org.quartz.impl.StdSchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
 
 import java.util.HashSet;
 import java.util.Set;
@@ -43,6 +45,7 @@ import static 
org.apache.inlong.manager.schedule.util.ScheduleUtils.genQuartzTri
  * the register/unregister/update requests from {@link QuartzScheduleClient}
  * */
 @Getter
+@Service
 public class QuartzScheduleEngine implements ScheduleEngine {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(QuartzScheduleEngine.class);
@@ -90,7 +93,7 @@ public class QuartzScheduleEngine implements ScheduleEngine {
     }
 
     @VisibleForTesting
-    public boolean handleRegister(ScheduleInfo scheduleInfo, Class<? extends 
QuartzOfflineSyncJob> clz) {
+    public boolean handleRegister(ScheduleInfo scheduleInfo, Class<? extends 
Job> clz) {
         if (scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) {
             throw new QuartzScheduleException("Group " + 
scheduleInfo.getInlongGroupId() + " is already registered");
         }
@@ -108,19 +111,19 @@ public class QuartzScheduleEngine implements 
ScheduleEngine {
 
     /**
      * Handle schedule unregister.
-     * @param scheduleInfo schedule info to unregister
+     * @param groupId group to un-register schedule info
      * */
     @Override
-    public boolean handleUnregister(ScheduleInfo scheduleInfo) {
-        if (scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) {
+    public boolean handleUnregister(String groupId) {
+        if (scheduledJobSet.contains(groupId)) {
             try {
-                scheduler.deleteJob(new 
JobKey(scheduleInfo.getInlongGroupId()));
+                scheduler.deleteJob(new JobKey(groupId));
             } catch (SchedulerException e) {
                 throw new QuartzScheduleException(e.getMessage());
             }
         }
-        scheduledJobSet.remove(scheduleInfo.getInlongGroupId());
-        LOGGER.info("Un-registered schedule info for {}", 
scheduleInfo.getInlongGroupId());
+        scheduledJobSet.remove(groupId);
+        LOGGER.info("Un-registered schedule info for {}", groupId);
         return true;
     }
 
@@ -134,8 +137,8 @@ public class QuartzScheduleEngine implements ScheduleEngine 
{
     }
 
     @VisibleForTesting
-    public boolean handleUpdate(ScheduleInfo scheduleInfo, Class<? extends 
QuartzOfflineSyncJob> clz) {
-        handleUnregister(scheduleInfo);
+    public boolean handleUpdate(ScheduleInfo scheduleInfo, Class<? extends 
Job> clz) {
+        handleUnregister(scheduleInfo.getInlongGroupId());
         handleRegister(scheduleInfo, clz);
         LOGGER.info("Updated schedule info for {}", 
scheduleInfo.getInlongGroupId());
         return false;
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
index 65475a0b98..1e4f43983e 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
@@ -21,11 +21,11 @@ import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
 import org.apache.inlong.manager.schedule.ScheduleType;
 import org.apache.inlong.manager.schedule.ScheduleUnit;
 import org.apache.inlong.manager.schedule.exception.QuartzScheduleException;
-import org.apache.inlong.manager.schedule.quartz.QuartzOfflineSyncJob;
 
 import org.apache.commons.lang3.StringUtils;
 import org.quartz.CronScheduleBuilder;
 import org.quartz.CronTrigger;
+import org.quartz.Job;
 import org.quartz.JobBuilder;
 import org.quartz.JobDetail;
 import org.quartz.ScheduleBuilder;
@@ -42,7 +42,7 @@ import java.util.Date;
  * */
 public class ScheduleUtils {
 
-    public static JobDetail genQuartzJobDetail(ScheduleInfo scheduleInfo, 
Class<? extends QuartzOfflineSyncJob> clz) {
+    public static JobDetail genQuartzJobDetail(ScheduleInfo scheduleInfo, 
Class<? extends Job> clz) {
         return JobBuilder.newJob(clz)
                 .withIdentity(scheduleInfo.getInlongGroupId())
                 .build();
diff --git 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/MockJob.java
 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/MockJob.java
index 9202ea5b40..bc0fecf99a 100644
--- 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/MockJob.java
+++ 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/MockJob.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.schedule.quartz;
 
+import org.quartz.Job;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
 import org.slf4j.Logger;
@@ -25,7 +26,7 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class MockJob extends QuartzOfflineSyncJob {
+public class MockJob implements Job {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MockJob.class);
 
diff --git 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngineTest.java
 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngineTest.java
index 008a7e42f8..11e7580f28 100644
--- 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngineTest.java
+++ 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngineTest.java
@@ -103,7 +103,7 @@ public class QuartzScheduleEngineTest extends 
BaseScheduleTest {
         MockJob.countDownLatch.await();
 
         // un-register before trigger finalized
-        scheduleEngine.handleUnregister(scheduleInfo);
+        scheduleEngine.handleUnregister(scheduleInfo.getInlongGroupId());
         // not job exist after un-register
         assertEquals(0, scheduleEngine.getScheduledJobSet().size());
         exist = scheduleEngine.getScheduler().checkExists(jobKey);
diff --git 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java
 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java
index 415331be3d..da8fd66c61 100644
--- 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java
+++ 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.schedule.util;
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
 import org.apache.inlong.manager.schedule.BaseScheduleTest;
 import org.apache.inlong.manager.schedule.exception.QuartzScheduleException;
-import org.apache.inlong.manager.schedule.quartz.QuartzOfflineSyncJob;
+import org.apache.inlong.manager.schedule.quartz.MockJob;
 
 import org.junit.jupiter.api.Test;
 import org.quartz.CronScheduleBuilder;
@@ -102,7 +102,7 @@ public class ScheduleUtilsTest extends BaseScheduleTest {
     @Test
     public void testGenJobDetail() {
         ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
-        JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, 
QuartzOfflineSyncJob.class);
+        JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, 
MockJob.class);
         assertNotNull(jobDetail);
 
         JobKey jobKey = jobDetail.getKey();
@@ -116,7 +116,7 @@ public class ScheduleUtilsTest extends BaseScheduleTest {
     public void testGenCronTrigger() {
         // normal
         ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
-        JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, 
QuartzOfflineSyncJob.class);
+        JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, 
MockJob.class);
 
         Trigger trigger = ScheduleUtils.genQuartzTrigger(jobDetail, 
scheduleInfo);
         assertNotNull(trigger);
@@ -139,7 +139,7 @@ public class ScheduleUtilsTest extends BaseScheduleTest {
 
         // cron
         scheduleInfo = genDefaultCronScheduleInfo();
-        jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, 
QuartzOfflineSyncJob.class);
+        jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, 
MockJob.class);
 
         trigger = ScheduleUtils.genQuartzTrigger(jobDetail, scheduleInfo);
         assertNotNull(trigger);
diff --git a/inlong-manager/manager-service/pom.xml 
b/inlong-manager/manager-service/pom.xml
index 1b300dfc34..ae42b902b7 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -52,6 +52,11 @@
             <artifactId>manager-workflow</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>manager-schedule</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.inlong</groupId>
             <artifactId>manager-test</artifactId>
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index c9f4019a4d..d42a61dbfb 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -59,6 +59,8 @@ import 
org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.sort.BaseSortConf;
 import org.apache.inlong.manager.pojo.sort.BaseSortConf.SortType;
@@ -72,6 +74,7 @@ import org.apache.inlong.manager.pojo.user.LoginUserUtils;
 import org.apache.inlong.manager.pojo.user.UserInfo;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.schedule.ScheduleOperator;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.source.SourceOperatorFactory;
 import org.apache.inlong.manager.service.source.StreamSourceOperator;
@@ -111,6 +114,7 @@ import static 
org.apache.inlong.common.constant.ClusterSwitch.BACKUP_CLUSTER_TAG
 import static 
org.apache.inlong.common.constant.ClusterSwitch.BACKUP_MQ_RESOURCE;
 import static 
org.apache.inlong.common.constant.ClusterSwitch.CLUSTER_SWITCH_TIME;
 import static 
org.apache.inlong.common.constant.ClusterSwitch.FINISH_SWITCH_INTERVAL_MIN;
+import static 
org.apache.inlong.manager.common.consts.InlongConstants.DATASYNC_OFFLINE_MODE;
 import static org.apache.inlong.manager.pojo.common.PageRequest.MAX_PAGE_SIZE;
 import static 
org.apache.inlong.manager.workflow.event.process.ProcessEventListener.EXECUTOR_SERVICE;
 
@@ -158,6 +162,9 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
     @Autowired
     private TenantUserRoleEntityMapper tenantUserRoleEntityMapper;
 
+    @Autowired
+    ScheduleOperator scheduleOperator;
+
     /**
      * Check whether modification is supported under the current group status, 
and which fields can be modified.
      *
@@ -208,6 +215,11 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
         // save ext info
         this.saveOrUpdateExt(groupId, request.getExtList());
 
+        // save schedule info for offline group
+        if (DATASYNC_OFFLINE_MODE.equals(request.getInlongGroupMode())) {
+            scheduleOperator.saveOpt(CommonBeanUtils.copyProperties(request, 
ScheduleInfoRequest::new), operator);
+        }
+
         LOGGER.info("success to save inlong group for groupId={} by user={}", 
groupId, operator);
         return groupId;
     }
@@ -239,7 +251,15 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
         Preconditions.expectNotNull(groupId, 
ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
         InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
         LOGGER.debug("success to check inlong group {}, exist? {}", groupId, 
entity != null);
-        return entity != null;
+        if (entity == null) {
+            return false;
+        }
+        return isScheduleInfoExist(entity);
+    }
+
+    private boolean isScheduleInfoExist(InlongGroupEntity entity) {
+        return DATASYNC_OFFLINE_MODE.equals(entity.getInlongGroupMode())
+                && 
scheduleOperator.scheduleInfoExist(entity.getInlongGroupId());
     }
 
     @Override
@@ -261,11 +281,29 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
         List<InlongStreamExtEntity> streamExtEntities = 
streamExtMapper.selectByRelatedId(groupId, null);
         BaseSortConf sortConf = buildSortConfig(streamExtEntities);
         groupInfo.setSortConf(sortConf);
-
+        if (DATASYNC_OFFLINE_MODE.equals(entity.getInlongGroupMode())) {
+            // get schedule info and set into group info
+            addScheduleInfo(entity, groupInfo);
+        }
         LOGGER.debug("success to get inlong group for groupId={}", groupId);
         return groupInfo;
     }
 
+    private void addScheduleInfo(InlongGroupEntity entity, InlongGroupInfo 
groupInfo) {
+        checkOfflineSyncScheduleExist(entity);
+        ScheduleInfo scheduleInfo = 
scheduleOperator.getScheduleInfo(entity.getInlongGroupId());
+        CommonBeanUtils.copyProperties(scheduleInfo, groupInfo);
+    }
+
+    private void checkOfflineSyncScheduleExist(InlongGroupEntity entity) {
+        // check schedule info for offline sync
+        if (!isScheduleInfoExist(entity)) {
+            String errorMsg = String.format("Schedule info not found for 
groupId=%s", entity.getInlongGroupId());
+            LOGGER.error(errorMsg);
+            throw new BusinessException(ErrorCodeEnum.SCHEDULE_NOT_FOUND, 
errorMsg);
+        }
+    }
+
     @Override
     public String getTenant(String groupId, String operator) {
         InlongGroupEntity groupEntity = 
groupMapper.selectByGroupIdWithoutTenant(groupId);
@@ -457,6 +495,12 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
         // save ext info
         this.saveOrUpdateExt(groupId, request.getExtList());
 
+        // save schedule info for offline group
+        if (DATASYNC_OFFLINE_MODE.equals(request.getInlongGroupMode())) {
+            
scheduleOperator.updateAndRegister(CommonBeanUtils.copyProperties(request, 
ScheduleInfoRequest::new),
+                    operator);
+        }
+
         LOGGER.info("success to update inlong group for groupId={} by 
user={}", groupId, operator);
         return groupId;
     }
@@ -612,6 +656,15 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
         // logically delete the associated extension info
         groupExtMapper.logicDeleteAllByGroupId(groupId);
 
+        // remove schedule
+        if (DATASYNC_OFFLINE_MODE.equals(entity.getInlongGroupMode())) {
+            try {
+                scheduleOperator.deleteByGroupIdOpt(entity.getInlongGroupId(), 
operator);
+            } catch (Exception e) {
+                LOGGER.warn("failed to delete schedule info for groupId={}, 
error msg: {}", groupId, e.getMessage());
+            }
+        }
+
         LOGGER.info("success to delete group and group ext property for 
groupId={} by user={}", groupId, operator);
         return true;
     }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java
index e27dab9e89..39d3a2a3bf 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java
@@ -24,11 +24,13 @@ import 
org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
+import org.apache.inlong.manager.service.schedule.ScheduleOperator;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.ScheduleOperateListener;
 
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
@@ -37,6 +39,9 @@ import java.util.List;
 @Slf4j
 public class GroupScheduleResourceListener implements ScheduleOperateListener {
 
+    @Autowired
+    private ScheduleOperator scheduleOperator;
+
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
@@ -68,7 +73,8 @@ public class GroupScheduleResourceListener implements 
ScheduleOperateListener {
         final String groupId = groupInfo.getInlongGroupId();
         log.info("begin to register schedule info for groupId={}", groupId);
 
-        // todo: register schedule info to schedule service
+        // handle schedule info after group approved
+        scheduleOperator.handleGroupApprove(groupId);
 
         // after register schedule info successfully, add ext property to 
group ext info
         saveInfo(groupInfo, InlongConstants.REGISTER_SCHEDULE_STATUS,
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
new file mode 100644
index 0000000000..6bf9c01432
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.schedule;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+
+/**
+ * Operator for schedule. Including:
+ * 1. schedule info management
+ * 2. schedule operations like resister, un-register etc.
+ * */
+public interface ScheduleOperator {
+
+    /**
+     * Save schedule info.
+     * There are two places may save schedule info:
+     * - 1. create new inlong group with schedule info
+     * - 2. create new schedule info directly(inlong group has been already 
exist), in this situation, we should
+     *      register schedule info to schedule engine if group has been 
approved.
+     * @param request schedule request need to save
+     * @param operator name of operator
+     * @return schedule info id in backend storage
+     */
+    int saveOpt(ScheduleInfoRequest request, String operator);
+
+    /**
+     * Check whether schedule info exists for specified inlong group
+     *
+     * @param groupId the group id to be queried
+     * @return does it exist
+     */
+    Boolean scheduleInfoExist(String groupId);
+
+    /**
+     * Get schedule info based on inlong group id
+     *
+     * @param groupId inlong group id
+     * @return detail of inlong group
+     */
+    ScheduleInfo getScheduleInfo(String groupId);
+
+    /**
+     * Modify schedule information
+     * There are two places may update schedule info:
+     * - 1. update inlong group with new schedule info
+     * - 2. update schedule info directly(inlong group has been already exist)
+     * @param request schedule request that needs to be modified
+     * @param operator name of operator
+     * @return whether succeed
+     */
+    Boolean updateOpt(ScheduleInfoRequest request, String operator);
+
+    /**
+     * Register schedule information
+     * @param request schedule request that needs to be modified
+     * @param operator name of operator
+     * @return whether succeed
+     */
+    Boolean updateAndRegister(ScheduleInfoRequest request, String operator);
+
+    /**
+     * Delete schedule info for groupId.
+     * There are two places may delete schedule info:
+     * - 1. delete an inlong group
+     * - 2. delete schedule info directly, left inlong group alone without 
schedule info, which means the group of
+     * offline sync job will never be triggered
+     * @param groupId groupId to find a schedule info to delete
+     * @param operator  name of operator
+     * @Return whether succeed
+     * */
+    Boolean deleteByGroupIdOpt(String groupId, String operator);
+
+    /**
+     * Handle inlong group approve, check schedule info and try to register it 
to schedule engine.
+     * @param groupId groupId to find a schedule info to delete
+     * @Return whether succeed
+     * */
+    Boolean handleGroupApprove(String groupId);
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
new file mode 100644
index 0000000000..411a80766a
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.schedule;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
+import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
+import org.apache.inlong.manager.dao.mapper.ScheduleEntityMapper;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+import org.apache.inlong.manager.schedule.ScheduleClientFactory;
+import org.apache.inlong.manager.schedule.ScheduleEngineClient;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import static org.apache.inlong.manager.common.enums.ScheduleStatus.APPROVED;
+import static org.apache.inlong.manager.common.enums.ScheduleStatus.REGISTERED;
+import static org.apache.inlong.manager.common.enums.ScheduleStatus.UPDATED;
+
+@Service
+public class ScheduleOperatorImpl implements ScheduleOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ScheduleOperatorImpl.class);
+
+    @Autowired
+    private ScheduleService scheduleService;
+
+    @Autowired
+    private InlongGroupExtEntityMapper groupExtMapper;
+
+    @Autowired
+    private ScheduleEntityMapper scheduleMapper;
+
+    @Autowired
+    private ScheduleClientFactory scheduleClientFactory;
+
+    private ScheduleEngineClient scheduleEngineClient;
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class)
+    public int saveOpt(ScheduleInfoRequest request, String operator) {
+        // save schedule info first
+        int scheduleInfoId = scheduleService.save(request, operator);
+        LOGGER.info("Save schedule info success for group {}", 
request.getInlongGroupId());
+        // process new schedule info for approved inlong group
+        
registerScheduleInfoForApprovedGroup(CommonBeanUtils.copyProperties(request, 
ScheduleInfo::new), operator);
+        return scheduleInfoId;
+    }
+
+    /**
+     * If an inlong group in DATASYNC_OFFLINE_MODE created first without 
schedule info and has been approved, it should
+     * be registered to schedule engine once the schedule info for this group 
is added.
+     * */
+    private void registerScheduleInfoForApprovedGroup(ScheduleInfo 
scheduleInfo, String operator) {
+        String groupId = scheduleInfo.getInlongGroupId();
+        InlongGroupExtEntity scheduleStatusExt =
+                groupExtMapper.selectByUniqueKey(groupId, 
InlongConstants.REGISTER_SCHEDULE_STATUS);
+        if 
(InlongConstants.REGISTERED.equalsIgnoreCase(scheduleStatusExt.getKeyValue())) {
+            // change schedule state to approved
+            scheduleService.updateStatus(scheduleInfo.getInlongGroupId(), 
APPROVED, operator);
+            registerToScheduleEngine(scheduleInfo, operator, false);
+            LOGGER.info("Register schedule info success for group {}", 
groupId);
+        }
+    }
+
+    private ScheduleEngineClient getScheduleEngineClient() {
+        if (scheduleEngineClient == null) {
+            scheduleEngineClient = scheduleClientFactory.getInstance();
+        }
+        return scheduleEngineClient;
+    }
+
+    @Override
+    public Boolean scheduleInfoExist(String groupId) {
+        return scheduleService.exist(groupId);
+    }
+
+    @Override
+    public ScheduleInfo getScheduleInfo(String groupId) {
+        return scheduleService.get(groupId);
+    }
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class)
+    public Boolean updateOpt(ScheduleInfoRequest request, String operator) {
+        // if the inlong group exist without schedule info
+        // then, save the new schedule info when updating inlong group
+        if (!scheduleInfoExist(request.getInlongGroupId())) {
+            saveOpt(request, operator);
+            return true;
+        }
+        ScheduleInfo scheduleInfo = CommonBeanUtils.copyProperties(request, 
ScheduleInfo::new);
+        if (!needUpdate(scheduleInfo)) {
+            LOGGER.info("schedule info not changed for group {}", 
request.getInlongGroupId());
+            return false;
+        }
+        // update schedule info
+        boolean res = scheduleService.update(request, operator);
+        // update status
+        scheduleService.updateStatus(request.getInlongGroupId(), UPDATED, 
operator);
+        return res;
+    }
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class)
+    public Boolean updateAndRegister(ScheduleInfoRequest request, String 
operator) {
+        updateOpt(request, operator);
+        return 
registerToScheduleEngine(CommonBeanUtils.copyProperties(request, 
ScheduleInfo::new), operator, true);
+    }
+
+    /**
+     * There are three places may trigger resister schedule info to schedule 
engine:
+     * - 1. new group approved with schedule info
+     * - 2. new schedule info for an exist approved inlong group added
+     * - 3. group's schedule info updated
+     * */
+    private Boolean registerToScheduleEngine(ScheduleInfo scheduleInfo, String 
operator, boolean isUpdate) {
+        // update(un-register and then register) or register
+        boolean res = isUpdate ? getScheduleEngineClient().update(scheduleInfo)
+                : getScheduleEngineClient().register(scheduleInfo);
+        // update status to REGISTERED
+        scheduleService.updateStatus(scheduleInfo.getInlongGroupId(), 
REGISTERED, operator);
+        LOGGER.info("{} schedule info success for group {}",
+                isUpdate ? "Update" : "Register", 
scheduleInfo.getInlongGroupId());
+        return res;
+    }
+
+    private boolean needUpdate(ScheduleInfo scheduleInfo) {
+        if (scheduleInfo == null) {
+            return false;
+        }
+        ScheduleInfo existedSchedule = 
getScheduleInfo(scheduleInfo.getInlongGroupId());
+        return !scheduleInfo.equals(existedSchedule);
+    }
+
+    @Override
+    public Boolean deleteByGroupIdOpt(String groupId, String operator) {
+        return scheduleService.deleteByGroupId(groupId, operator);
+    }
+
+    @Override
+    public Boolean handleGroupApprove(String groupId) {
+        // if the inlong group exist without schedule info
+        // then, save the new schedule info when updating inlong group
+        if (!scheduleInfoExist(groupId)) {
+            LOGGER.warn("schedule info not exist for group {}", groupId);
+            return false;
+        }
+        ScheduleInfo scheduleInfo = getScheduleInfo(groupId);
+        // change schedule state to approved
+        scheduleService.updateStatus(groupId, APPROVED, null);
+        return registerToScheduleEngine(scheduleInfo, null, false);
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java
index d00e7134d4..f3eb189c38 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.schedule;
 
+import org.apache.inlong.manager.common.enums.ScheduleStatus;
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
 
@@ -61,6 +62,16 @@ public interface ScheduleService {
     Boolean update(@Valid @NotNull(message = "schedule request cannot be 
null") ScheduleInfoRequest request,
             String operator);
 
+    /**
+     * Update status of schedule info.
+     *
+     * @param groupId group to update schedule status
+     * @param newStatus status to update
+     * @param operator name of operator
+     * @return whether succeed
+     */
+    Boolean updateStatus(String groupId, ScheduleStatus newStatus, String 
operator);
+
     /**
      * Delete schedule info for gropuId.
      * @param groupId groupId to find a schedule info to delete
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java
index 480189da9e..6459a811cd 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java
@@ -35,7 +35,18 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.inlong.manager.common.enums.ScheduleStatus.APPROVED;
+import static org.apache.inlong.manager.common.enums.ScheduleStatus.DELETED;
+import static org.apache.inlong.manager.common.enums.ScheduleStatus.NEW;
+import static org.apache.inlong.manager.common.enums.ScheduleStatus.REGISTERED;
+import static org.apache.inlong.manager.common.enums.ScheduleStatus.UPDATED;
 
 @Service
 public class ScheduleServiceImpl implements ScheduleService {
@@ -47,6 +58,9 @@ public class ScheduleServiceImpl implements ScheduleService {
     @Autowired
     private ScheduleEntityMapper scheduleEntityMapper;
 
+    // finite state machine
+    private Map<ScheduleStatus, Set<ScheduleStatus>> fsm;
+
     @Override
     public int save(ScheduleInfoRequest request, String operator) {
         LOGGER.debug("begin to save schedule info, scheduleInfo={}, 
operator={}", request, operator);
@@ -62,7 +76,8 @@ public class ScheduleServiceImpl implements ScheduleService {
         scheduleEntity.setStatus(ScheduleStatus.NEW.getCode());
         scheduleEntity.setCreator(operator);
         scheduleEntity.setModifier(operator);
-        return scheduleEntityMapper.insert(scheduleEntity);
+        scheduleEntityMapper.insert(scheduleEntity);
+        return scheduleEntity.getId();
     }
 
     @Override
@@ -97,6 +112,43 @@ public class ScheduleServiceImpl implements ScheduleService 
{
         return true;
     }
 
+    @Override
+    public Boolean updateStatus(String groupId, ScheduleStatus newStatus, 
String operator) {
+        LOGGER.debug("begin to update schedule status for groupId={}", 
groupId);
+        ScheduleEntity entity = getScheduleEntity(groupId);
+        ScheduleStatus preStatus = ScheduleStatus.forCode(entity.getStatus());
+        if (!isAllowedTransition(preStatus, newStatus)) {
+            String errorMsg = String.format("Schedule status transition is not 
allowed from %s to %s for group %s",
+                    preStatus, newStatus, groupId);
+            LOGGER.error(errorMsg);
+            throw new 
BusinessException(ErrorCodeEnum.SCHEDULE_STATUS_TRANSITION_NOT_ALLOWED);
+        }
+        entity.setStatus(newStatus.getCode());
+        entity.setModifier(operator);
+        updateScheduleInfo(entity,
+                String.format("update schedule status from %s to %s failed for 
groupId=%s",
+                        preStatus.getCode(), newStatus.getCode(), 
entity.getInlongGroupId()));
+        LOGGER.info("success to update schedule status from {} to {} for 
groupId={}",
+                preStatus.getCode(), newStatus.getCode(), groupId);
+        return true;
+    }
+
+    private void initFSMIfNeed() {
+        if (fsm != null) {
+            return;
+        }
+        fsm = new HashMap<>();
+        fsm.put(NEW, new HashSet<>(Arrays.asList(APPROVED, DELETED)));
+        fsm.put(APPROVED, new HashSet<>(Arrays.asList(REGISTERED, DELETED)));
+        fsm.put(REGISTERED, new HashSet<>(Arrays.asList(UPDATED, DELETED)));
+        fsm.put(UPDATED, new HashSet<>(Arrays.asList(REGISTERED, DELETED)));
+    }
+
+    private boolean isAllowedTransition(ScheduleStatus preStatus, 
ScheduleStatus newStatus) {
+        initFSMIfNeed();
+        return fsm.get(preStatus).contains(newStatus);
+    }
+
     @Override
     public Boolean deleteByGroupId(String groupId, String operator) {
         LOGGER.debug("begin to delete schedule info for groupId={}", groupId);
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java
index 801cc09b0e..4af07854c2 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java
@@ -25,7 +25,7 @@ import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
 import org.apache.inlong.manager.pojo.user.LoginUserUtils;
 import org.apache.inlong.manager.service.operationlog.OperationLog;
-import org.apache.inlong.manager.service.schedule.ScheduleService;
+import org.apache.inlong.manager.service.schedule.ScheduleOperator;
 
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
@@ -45,35 +45,43 @@ import 
org.springframework.web.bind.annotation.RestController;
 public class InLongSchedulerController {
 
     @Autowired
-    private ScheduleService scheduleService;
+    private ScheduleOperator scheduleOperator;
 
     @RequestMapping(value = "/schedule/save", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.CREATE, operationTarget = 
OperationTarget.SCHEDULE)
     @ApiOperation(value = "Save schedule info")
     public Response<Integer> save(@RequestBody ScheduleInfoRequest request) {
-        int result = scheduleService.save(request, 
LoginUserUtils.getLoginUser().getName());
-        return Response.success(result);
+        int scheduleInfoId = scheduleOperator.saveOpt(request, 
LoginUserUtils.getLoginUser().getName());
+        return Response.success(scheduleInfoId);
     }
 
     @RequestMapping(value = "/schedule/exist/{groupId}", method = 
RequestMethod.GET)
     @ApiOperation(value = "Is the schedule info exists for inlong group")
     @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required 
= true)
     public Response<Boolean> exist(@PathVariable String groupId) {
-        return Response.success(scheduleService.exist(groupId));
+        return Response.success(scheduleOperator.scheduleInfoExist(groupId));
     }
 
     @RequestMapping(value = "/schedule/get", method = RequestMethod.GET)
     @ApiOperation(value = "Get schedule info for inlong group")
     @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required 
= true)
     public Response<ScheduleInfo> get(@RequestParam String groupId) {
-        return Response.success(scheduleService.get(groupId));
+        return Response.success(scheduleOperator.getScheduleInfo(groupId));
     }
 
     @RequestMapping(value = "/schedule/update", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.UPDATE, operationTarget = 
OperationTarget.SCHEDULE)
     @ApiOperation(value = "Update schedule info")
     public Response<Boolean> update(@Validated(UpdateValidation.class) 
@RequestBody ScheduleInfoRequest request) {
-        return Response.success(scheduleService.update(request, 
LoginUserUtils.getLoginUser().getName()));
+        return Response.success(scheduleOperator.updateOpt(request, 
LoginUserUtils.getLoginUser().getName()));
+    }
+
+    @RequestMapping(value = "/schedule/updateAndRegister", method = 
RequestMethod.POST)
+    @OperationLog(operation = OperationType.UPDATE, operationTarget = 
OperationTarget.SCHEDULE)
+    @ApiOperation(value = "Update schedule info and register to schedule 
engine")
+    public Response<Boolean> updateAndRegister(
+            @Validated(UpdateValidation.class) @RequestBody 
ScheduleInfoRequest request) {
+        return Response.success(scheduleOperator.updateAndRegister(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @RequestMapping(value = "/schedule/delete/{groupId}", method = 
RequestMethod.DELETE)
@@ -82,7 +90,7 @@ public class InLongSchedulerController {
     @ApiImplicitParam(name = "groupId", value = "Inlong group id", 
dataTypeClass = String.class, required = true)
     public Response<Boolean> delete(@PathVariable String groupId) {
         String operator = LoginUserUtils.getLoginUser().getName();
-        return Response.success(scheduleService.deleteByGroupId(groupId, 
operator));
+        return Response.success(scheduleOperator.deleteByGroupIdOpt(groupId, 
operator));
     }
 
 }
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-dev.properties 
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 81b32bb941..60f82953a5 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -98,4 +98,8 @@ cls.manager.endpoint=127.0.0.1
 
 
 manager.url=127.0.0.1:8083
-agent.install.path=
\ No newline at end of file
+agent.install.path=
+
+# schedule engine type
+# support none(no scheduler) and quartz(quartz scheduler), default is none
+inlong.schedule.engine=none
\ No newline at end of file
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-prod.properties 
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index 6143155bf0..3e8f329470 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -95,3 +95,6 @@ group.deleted.enabled=false
 # Tencent cloud log service endpoint, The Operator cls resource by it
 cls.manager.endpoint=127.0.0.1
 
+# schedule engine type
+# support none(no scheduler) and quartz(quartz scheduler), default is none
+inlong.schedule.engine=none
\ No newline at end of file
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-test.properties 
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index dcab3fb1cf..5ff929c2b8 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -96,3 +96,6 @@ group.deleted.enabled=false
 # Tencent cloud log service endpoint, The Operator cls resource by it
 cls.manager.endpoint=127.0.0.1
 
+# schedule engine type
+# support none(no scheduler) and quartz(quartz scheduler), default is none
+inlong.schedule.engine=none
\ No newline at end of file
diff --git 
a/inlong-manager/manager-web/src/main/resources/application.properties 
b/inlong-manager/manager-web/src/main/resources/application.properties
index 6b56dfb3d9..a6eec820ad 100644
--- a/inlong-manager/manager-web/src/main/resources/application.properties
+++ b/inlong-manager/manager-web/src/main/resources/application.properties
@@ -66,3 +66,7 @@ audit.user.ids=3,4,5,6
 
 # tencent cloud log service endpoint, The Operator cls resource by it
 cls.manager.endpoint=127.0.0.1
+
+# schedule engine type
+# support none(no scheduler) and quartz(quartz scheduler), default is none
+inlong.schedule.engine=none
\ No newline at end of file

Reply via email to